From c806e7b32a6ffba5f08cb7ff0fffe8bb78051a16 Mon Sep 17 00:00:00 2001 From: Arbel Deutsch Peled Date: Wed, 13 Apr 2016 09:46:24 +0300 Subject: [PATCH] Added Deletion to Bulletin Board Server and Local Client --- .../CachedBulletinBoardClient.java | 312 ++++++++++++++---- .../LocalBulletinBoardClient.java | 36 +- .../SingleServerBulletinBoardClient.java | 4 +- .../ThreadedBulletinBoardSubscriber.java | 9 +- .../GenericSubscriptionClientTester.java | 5 +- .../LocalBulletinBoardClientTest.java | 2 +- .../sqlserver/BulletinBoardSQLServer.java | 46 ++- .../sqlserver/H2QueryProvider.java | 53 ++- .../sqlserver/MySQLQueryProvider.java | 12 +- .../bulletinboard/BulletinBoardClient.java | 4 +- .../BulletinBoardMessageDeleter.java | 30 ++ .../DeletableBulletinBoardServer.java | 29 ++ ...etableSubscriptionBulletinBoardClient.java | 7 + .../SubscriptionAsyncBulletinBoardClient.java | 7 - .../SubscriptionBulletinBoardClient.java | 7 + 15 files changed, 448 insertions(+), 115 deletions(-) create mode 100644 meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardMessageDeleter.java create mode 100644 meerkat-common/src/main/java/meerkat/bulletinboard/DeletableBulletinBoardServer.java create mode 100644 meerkat-common/src/main/java/meerkat/bulletinboard/DeletableSubscriptionBulletinBoardClient.java delete mode 100644 meerkat-common/src/main/java/meerkat/bulletinboard/SubscriptionAsyncBulletinBoardClient.java create mode 100644 meerkat-common/src/main/java/meerkat/bulletinboard/SubscriptionBulletinBoardClient.java diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java index 96ba76d..6aac297 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java @@ -1,168 +1,338 @@ package meerkat.bulletinboard; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.ListeningScheduledExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import meerkat.comm.CommunicationException; -import meerkat.protobuf.BulletinBoardAPI; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Voting.*; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; -import java.util.concurrent.Executors; /** * Created by Arbel Deutsch Peled on 03-Mar-16. * This is a full-fledged implementation of a Bulletin Board Client * It provides asynchronous access to several remote servers, as well as a local cache - * Read/write operations are performed on the local server + * Read operations are performed on the local server + * Batch reads are performed on the local server and, if they fail, also on the remote servers + * Write operations are performed first on the local server and then on the remotes * After any read is carried out, a subscription is made for the specific query to make sure the local DB will be updated * The database also employs a synchronizer which makes sure local data is sent to the remote servers */ -public class CachedBulletinBoardClient implements SubscriptionAsyncBulletinBoardClient { +public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClient { - private final BulletinBoardClient localClient; + private final AsyncBulletinBoardClient localClient; private AsyncBulletinBoardClient remoteClient; private BulletinBoardSubscriber subscriber; - private final int threadPoolSize; - private final long failDelayInMilliseconds; - private final long subscriptionIntervalInMilliseconds; + private class SubscriptionStoreCallback implements FutureCallback> { - public CachedBulletinBoardClient(BulletinBoardClient localClient, - int threadPoolSize, - long failDelayInMilliseconds, - long subscriptionIntervalInMilliseconds) - throws IllegalAccessException, InstantiationException { + private final FutureCallback callback; + + public SubscriptionStoreCallback(){ + callback = null; + } + + public SubscriptionStoreCallback(FutureCallback callback){ + this.callback = callback; + } + + @Override + public void onSuccess(List result) { + for (BulletinBoardMessage msg : result) { + try { + localClient.postMessage(msg); + } catch (CommunicationException ignored) { + // TODO: log + } + } + } + + @Override + public void onFailure(Throwable t) { + if (callback != null) { + callback.onFailure(t); // This is some hard error that cannot be dealt with + } + } + + } + + /** + * Creates a Cached Client + * Assumes all parameters are initialized + * @param localClient is a Client for the local instance + * @param remoteClient is a Client for the remote instance(s); Should have endless retries for post operations + * @param subscriber is a subscription service to the remote instance(s) + */ + public CachedBulletinBoardClient(AsyncBulletinBoardClient localClient, + AsyncBulletinBoardClient remoteClient, + BulletinBoardSubscriber subscriber) { this.localClient = localClient; - this.threadPoolSize = threadPoolSize; - this.failDelayInMilliseconds = failDelayInMilliseconds; - this.subscriptionIntervalInMilliseconds = subscriptionIntervalInMilliseconds; - - remoteClient = new ThreadedBulletinBoardClient(); + this.remoteClient = remoteClient; + this.subscriber = subscriber; } @Override - public MessageID postMessage(BulletinBoardMessage msg, FutureCallback callback) { - return null; - } + public MessageID postMessage(final BulletinBoardMessage msg, final FutureCallback callback) { - @Override - public MessageID postBatch(CompleteBatch completeBatch, FutureCallback callback) { - return null; - } + return localClient.postMessage(msg, new FutureCallback() { + @Override + public void onSuccess(Boolean result) { + remoteClient.postMessage(msg, callback); + } - @Override - public void beginBatch(BeginBatchMessage beginBatchMessage, FutureCallback callback) { + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }); } @Override - public void postBatchData(byte[] signerId, int batchId, List batchDataList, int startPosition, FutureCallback callback) { + public MessageID postBatch(final CompleteBatch completeBatch, final FutureCallback callback) { + + return localClient.postBatch(completeBatch, new FutureCallback() { + @Override + public void onSuccess(Boolean result) { + remoteClient.postBatch(completeBatch, callback); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }); } @Override - public void postBatchData(byte[] signerId, int batchId, List batchDataList, FutureCallback callback) { + public void beginBatch(final BeginBatchMessage beginBatchMessage, final FutureCallback callback) { + + localClient.beginBatch(beginBatchMessage, new FutureCallback() { + @Override + public void onSuccess(Boolean result) { + remoteClient.beginBatch(beginBatchMessage, callback); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }); } @Override - public void postBatchData(ByteString signerId, int batchId, List batchDataList, int startPosition, FutureCallback callback) { + public void postBatchData(final byte[] signerId, final int batchId, final List batchDataList, + final int startPosition, final FutureCallback callback) { + + localClient.postBatchData(signerId, batchId, batchDataList, startPosition, new FutureCallback() { + @Override + public void onSuccess(Boolean result) { + remoteClient.postBatchData(signerId, batchId, batchDataList, startPosition, callback); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }); } @Override - public void postBatchData(ByteString signerId, int batchId, List batchDataList, FutureCallback callback) { + public void postBatchData(final byte[] signerId, final int batchId, final List batchDataList, final FutureCallback callback) { + + localClient.postBatchData(signerId, batchId, batchDataList, new FutureCallback() { + @Override + public void onSuccess(Boolean result) { + remoteClient.postBatchData(signerId, batchId, batchDataList, callback); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }); } @Override - public void closeBatch(CloseBatchMessage closeBatchMessage, FutureCallback callback) { + public void postBatchData(final ByteString signerId, final int batchId, final List batchDataList, + final int startPosition, final FutureCallback callback) { + + localClient.postBatchData(signerId, batchId, batchDataList, startPosition, new FutureCallback() { + @Override + public void onSuccess(Boolean result) { + remoteClient.postBatchData(signerId, batchId, batchDataList, startPosition, callback); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }); + + } + + @Override + public void postBatchData(final ByteString signerId, final int batchId, final List batchDataList, final FutureCallback callback) { + + localClient.postBatchData(signerId, batchId, batchDataList, new FutureCallback() { + @Override + public void onSuccess(Boolean result) { + remoteClient.postBatchData(signerId, batchId, batchDataList, callback); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }); + + } + + @Override + public void closeBatch(final CloseBatchMessage closeBatchMessage, final FutureCallback callback) { + + localClient.closeBatch(closeBatchMessage, new FutureCallback() { + @Override + public void onSuccess(Boolean result) { + remoteClient.closeBatch(closeBatchMessage, callback); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }); } @Override public void getRedundancy(MessageID id, FutureCallback callback) { - } - - @Override - public void readMessages(MessageFilterList filterList, FutureCallback> callback) { + remoteClient.getRedundancy(id, callback); } @Override - public void readBatch(BatchSpecificationMessage batchSpecificationMessage, FutureCallback callback) { + public void readMessages(MessageFilterList filterList, final FutureCallback> callback) { + + localClient.readMessages(filterList, callback); + + subscriber.subscribe(filterList, new SubscriptionStoreCallback(callback)); + + } + + @Override + public void readBatch(final BatchSpecificationMessage batchSpecificationMessage, final FutureCallback callback) { + + localClient.readBatch(batchSpecificationMessage, new FutureCallback() { + @Override + public void onSuccess(CompleteBatch result) { + callback.onSuccess(result); // Read from local client was successful + } + + @Override + public void onFailure(Throwable t) { + + // Read from local unsuccessful: try to read from remote + + remoteClient.readBatch(batchSpecificationMessage, new FutureCallback() { + + @Override + public void onSuccess(CompleteBatch result) { + + // Read from remote was successful: store in local and return result + + localClient.postBatch(result, new FutureCallback() { + @Override + public void onSuccess(Boolean result) {} + @Override + public void onFailure(Throwable t) {} + }); + + callback.onSuccess(result); + + } + + @Override + public void onFailure(Throwable t) { + + // Read from remote was unsuccessful: report error + callback.onFailure(t); + + } + + }); + + } + + }); } @Override public void querySync(SyncQuery syncQuery, FutureCallback callback) { + localClient.querySync(syncQuery, callback); + } @Override - public void init(BulletinBoardClientParams clientParams) { - - remoteClient.init(clientParams); - - ListeningScheduledExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadPoolSize)); - - List subscriberClients = new ArrayList<>(clientParams.getBulletinBoardAddressCount()); - - for (String address : clientParams.getBulletinBoardAddressList()){ - - SubscriptionAsyncBulletinBoardClient newClient = - new SingleServerBulletinBoardClient(executorService, failDelayInMilliseconds, subscriptionIntervalInMilliseconds); - - newClient.init(clientParams.toBuilder().clearBulletinBoardAddress().addBulletinBoardAddress(address).build()); - - subscriberClients.add(newClient); - - } - - subscriber = new ThreadedBulletinBoardSubscriber(subscriberClients, localClient); - - } + /** + * This is a stub method + * All resources are assumed to be initialized + */ + public void init(BulletinBoardClientParams clientParams) {} @Override public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException { - return null; + localClient.postMessage(msg); + return remoteClient.postMessage(msg); } @Override public float getRedundancy(MessageID id) { - return 0; + return remoteClient.getRedundancy(id); } @Override public List readMessages(MessageFilterList filterList) { - return null; + subscriber.subscribe(filterList, new SubscriptionStoreCallback()); + return localClient.readMessages(filterList); } @Override - public SyncQuery generateSyncQuery(GenerateSyncQueryParams GenerateSyncQueryParams) throws CommunicationException { - return null; + public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException { + return localClient.generateSyncQuery(generateSyncQueryParams); } @Override public void close() { - + localClient.close(); + remoteClient.close(); } @Override public void subscribe(MessageFilterList filterList, FutureCallback> callback) { - + subscriber.subscribe(filterList, callback); } @Override public void subscribe(MessageFilterList filterList, long startEntry, FutureCallback> callback) { + subscriber.subscribe(filterList, startEntry, callback); + } + + public int syncStatus(){ + return 0; + } + + public void reSync(){ } -} + +} \ No newline at end of file diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java index df3e196..1f56690 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java @@ -22,14 +22,14 @@ import java.util.concurrent.TimeUnit; /** * Created by Arbel Deutsch Peled on 15-Mar-16. - * This client is to be used mainly for testing. - * It wraps a BulletinBoardServer in an asynchronous client. + * This client wraps a BulletinBoardServer in an asynchronous client. + * It is meant to be used as a local cache handler and for testing purposes. * This means the access to the server is direct (via method calls) instead of through a TCP connection. * The client implements both synchronous and asynchronous method calls, but calls to the server itself are performed synchronously. */ -public class LocalBulletinBoardClient implements SubscriptionAsyncBulletinBoardClient{ +public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBoardClient { - private final BulletinBoardServer server; + private final DeletableBulletinBoardServer server; private final ListeningScheduledExecutorService executorService; private final BatchDigest digest; private final int subsrciptionDelay; @@ -40,7 +40,7 @@ public class LocalBulletinBoardClient implements SubscriptionAsyncBulletinBoardC * @param threadNum is the number of concurrent threads to allocate for the client * @param subscriptionDelay is the required delay between subscription calls in milliseconds */ - public LocalBulletinBoardClient(BulletinBoardServer server, int threadNum, int subscriptionDelay) { + public LocalBulletinBoardClient(DeletableBulletinBoardServer server, int threadNum, int subscriptionDelay) { this.server = server; this.executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadNum)); this.digest = new GenericBatchDigest(new SHA256Digest()); @@ -517,8 +517,30 @@ public class LocalBulletinBoardClient implements SubscriptionAsyncBulletinBoardC } @Override - public SyncQuery generateSyncQuery(GenerateSyncQueryParams GenerateSyncQueryParams) throws CommunicationException { - return server.generateSyncQuery(GenerateSyncQueryParams); + public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException { + return server.generateSyncQuery(generateSyncQueryParams); + } + + @Override + public void deleteMessage(MessageID msgID, FutureCallback callback) { + + try { + callback.onSuccess(server.deleteMessage(msgID).getValue()); + } catch (CommunicationException e) { + callback.onFailure(e); + } + + } + + @Override + public void deleteMessage(long entryNum, FutureCallback callback) { + + try { + callback.onSuccess(server.deleteMessage(entryNum).getValue()); + } catch (CommunicationException e) { + callback.onFailure(e); + } + } @Override diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java index 34531cf..f12430d 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java @@ -7,12 +7,10 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import meerkat.bulletinboard.workers.singleserver.*; import meerkat.comm.CommunicationException; -import meerkat.protobuf.BulletinBoardAPI; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Voting.BulletinBoardClientParams; import meerkat.util.BulletinBoardUtils; -import javax.ws.rs.NotFoundException; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedList; @@ -30,7 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; * If the list of servers contains more than one server: the server actually used is the first one * The class further implements a delayed access to the server after a communication error occurs */ -public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient implements SubscriptionAsyncBulletinBoardClient { +public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient implements SubscriptionBulletinBoardClient { private final int MAX_RETRIES = 11; diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java index cf8d47d..ca8ef84 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java @@ -8,7 +8,6 @@ import meerkat.util.BulletinBoardUtils; import static meerkat.protobuf.BulletinBoardAPI.FilterType.*; -import java.sql.Time; import java.util.*; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; @@ -19,11 +18,11 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber { - protected final Collection clients; + protected final Collection clients; protected final BulletinBoardClient localClient; - protected Iterator clientIterator; - protected SubscriptionAsyncBulletinBoardClient currentClient; + protected Iterator clientIterator; + protected SubscriptionBulletinBoardClient currentClient; private long lastServerSwitchTime; @@ -32,7 +31,7 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber private static final Float[] BREAKPOINTS = {0.5f, 0.75f, 0.9f, 0.95f, 0.99f, 0.999f}; - public ThreadedBulletinBoardSubscriber(Collection clients, BulletinBoardClient localClient) { + public ThreadedBulletinBoardSubscriber(Collection clients, BulletinBoardClient localClient) { this.clients = clients; this.localClient = localClient; diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java index a91f8d6..4a5fe62 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java @@ -17,7 +17,6 @@ import java.util.*; import java.util.concurrent.Semaphore; import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.startsWith; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -38,7 +37,7 @@ public class GenericSubscriptionClientTester { private static String CERT1_PEM_EXAMPLE = "/certs/enduser-certs/user1.crt"; private static String CERT3_PEM_EXAMPLE = "/certs/enduser-certs/user3.crt"; - private SubscriptionAsyncBulletinBoardClient bulletinBoardClient; + private SubscriptionBulletinBoardClient bulletinBoardClient; private Random random; private BulletinBoardMessageGenerator generator; @@ -46,7 +45,7 @@ public class GenericSubscriptionClientTester { private Semaphore jobSemaphore; private Vector thrown; - public GenericSubscriptionClientTester(SubscriptionAsyncBulletinBoardClient bulletinBoardClient){ + public GenericSubscriptionClientTester(SubscriptionBulletinBoardClient bulletinBoardClient){ this.bulletinBoardClient = bulletinBoardClient; diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java index d2039ae..0ab5c67 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java @@ -48,7 +48,7 @@ public class LocalBulletinBoardClientTest { throw new CommunicationException(e.getCause() + " " + e.getMessage()); } - BulletinBoardServer server = new BulletinBoardSQLServer(queryProvider); + DeletableBulletinBoardServer server = new BulletinBoardSQLServer(queryProvider); server.init(DB_NAME); LocalBulletinBoardClient client = new LocalBulletinBoardClient(server, THREAD_NUM, SUBSRCIPTION_DELAY); diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java index ab82ab1..cebeb8e 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java @@ -6,6 +6,7 @@ import java.util.*; import com.google.protobuf.*; import com.google.protobuf.Timestamp; +import com.sun.org.apache.xpath.internal.operations.Bool; import meerkat.bulletinboard.*; import meerkat.bulletinboard.sqlserver.mappers.*; import static meerkat.bulletinboard.BulletinBoardConstants.*; @@ -16,6 +17,7 @@ import meerkat.comm.MessageOutputStream; import meerkat.crypto.concrete.ECDSASignature; import meerkat.crypto.concrete.SHA256Digest; +import meerkat.protobuf.BulletinBoardAPI; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Crypto.Signature; import meerkat.protobuf.Crypto.SignatureVerificationKey; @@ -38,7 +40,7 @@ import org.springframework.jdbc.support.KeyHolder; /** * This is a generic SQL implementation of the BulletinBoardServer API. */ -public class BulletinBoardSQLServer implements BulletinBoardServer{ +public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{ /** * This interface provides the required implementation-specific data to enable an access to an actual SQL server. @@ -67,6 +69,16 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ new int[] {Types.BLOB, Types.TIMESTAMP, Types.BLOB} ), + DELETE_MSG_BY_ENTRY( + new String[] {"EntryNum"}, + new int[] {Types.INTEGER} + ), + + DELETE_MSG_BY_ID( + new String[] {"MsgId"}, + new int[] {Types.BLOB} + ), + INSERT_NEW_TAG( new String[] {"Tag"}, new int[] {Types.VARCHAR} @@ -529,6 +541,38 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ return postMessage(msg, true); // Perform a post and check the signature for authenticity } + @Override + public BoolMsg deleteMessage(MessageID msgID) throws CommunicationException { + + String sql = sqlQueryProvider.getSQLString(QueryType.DELETE_MSG_BY_ID); + Map namedParameters = new HashMap(); + + namedParameters.put(QueryType.DELETE_MSG_BY_ID.getParamName(0),msgID); + + int affectedRows = jdbcTemplate.update(sql, namedParameters); + + //TODO: Log + + return BoolMsg.newBuilder().setValue(affectedRows > 0).build(); + + } + + @Override + public BoolMsg deleteMessage(long entryNum) throws CommunicationException { + + String sql = sqlQueryProvider.getSQLString(QueryType.DELETE_MSG_BY_ENTRY); + Map namedParameters = new HashMap(); + + namedParameters.put(QueryType.DELETE_MSG_BY_ENTRY.getParamName(0),entryNum); + + int affectedRows = jdbcTemplate.update(sql, namedParameters); + + //TODO: Log + + return BoolMsg.newBuilder().setValue(affectedRows > 0).build(); + + } + /** * This is a container class for and SQL string builder and a MapSqlParameterSource to be used with it diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java index 44a55da..bef6d68 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java @@ -30,19 +30,28 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider switch(queryType) { case ADD_SIGNATURE: - return "INSERT INTO SignatureTable (EntryNum, SignerId, Signature)" - + " SELECT DISTINCT :EntryNum AS Entry, :SignerId AS Id, :Signature AS Sig FROM UtilityTable AS Temp" - + " WHERE NOT EXISTS" - + " (SELECT 1 FROM SignatureTable AS SubTable WHERE SubTable.SignerId = :SignerId AND SubTable.EntryNum = :EntryNum)"; + return MessageFormat.format( + "INSERT INTO SignatureTable (EntryNum, SignerId, Signature)" + + " SELECT DISTINCT :{0} AS Entry, :{1} AS Id, :{2} AS Sig FROM UtilityTable AS Temp" + + " WHERE NOT EXISTS" + + " (SELECT 1 FROM SignatureTable AS SubTable WHERE SubTable.EntryNum = :{0} AND SubTable.SignerId = :{1})", + QueryType.ADD_SIGNATURE.getParamName(0), + QueryType.ADD_SIGNATURE.getParamName(1), + QueryType.ADD_SIGNATURE.getParamName(2)); case CONNECT_TAG: - return "INSERT INTO MsgTagTable (TagId, EntryNum)" - + " SELECT DISTINCT TagTable.TagId, :EntryNum AS NewEntry FROM TagTable WHERE Tag = :Tag" - + " AND NOT EXISTS (SELECT 1 FROM MsgTagTable AS SubTable WHERE SubTable.TagId = TagTable.TagId" - + " AND SubTable.EntryNum = :EntryNum)"; + return MessageFormat.format( + "INSERT INTO MsgTagTable (TagId, EntryNum)" + + " SELECT DISTINCT TagTable.TagId, :{0} AS NewEntry FROM TagTable WHERE Tag = :{1}" + + " AND NOT EXISTS (SELECT 1 FROM MsgTagTable AS SubTable WHERE SubTable.TagId = TagTable.TagId" + + " AND SubTable.EntryNum = :{0})", + QueryType.CONNECT_TAG.getParamName(0), + QueryType.CONNECT_TAG.getParamName(1)); case FIND_MSG_ID: - return "SELECT EntryNum From MsgTable WHERE MsgId = :MsgId"; + return MessageFormat.format( + "SELECT EntryNum From MsgTable WHERE MsgId = :{0}", + QueryType.FIND_MSG_ID.getParamName(0)); case FIND_TAG_ID: return MessageFormat.format( @@ -59,14 +68,32 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider return "SELECT MsgTable.EntryNum, MsgTable.MsgId, MsgTable.ExactTime FROM MsgTable"; case GET_SIGNATURES: - return "SELECT Signature FROM SignatureTable WHERE EntryNum = :EntryNum"; + return MessageFormat.format( + "SELECT Signature FROM SignatureTable WHERE EntryNum = :{0}", + QueryType.GET_SIGNATURES.getParamName(0)); case INSERT_MSG: - return "INSERT INTO MsgTable (MsgId, Msg, ExactTime) VALUES(:MsgId,:Msg,:TimeStamp)"; + return MessageFormat.format( + "INSERT INTO MsgTable (MsgId, ExactTime, Msg) VALUES(:{0}, :{1}, :{2})", + QueryType.INSERT_MSG.getParamName(0), + QueryType.INSERT_MSG.getParamName(1), + QueryType.INSERT_MSG.getParamName(2)); + + case DELETE_MSG_BY_ENTRY: + return MessageFormat.format( + "DELETE FROM MsgTable WHERE EntryNum = :{0}", + QueryType.DELETE_MSG_BY_ENTRY.getParamName(0)); + + case DELETE_MSG_BY_ID: + return MessageFormat.format( + "DELETE FROM MsgTable WHERE MsgId = :{0}", + QueryType.DELETE_MSG_BY_ID.getParamName(0)); case INSERT_NEW_TAG: - return "INSERT INTO TagTable(Tag) SELECT DISTINCT :Tag AS NewTag FROM UtilityTable WHERE" - + " NOT EXISTS (SELECT 1 FROM TagTable AS SubTable WHERE SubTable.Tag = :Tag)"; + return MessageFormat.format( + "INSERT INTO TagTable(Tag) SELECT DISTINCT :Tag AS NewTag FROM UtilityTable WHERE" + + " NOT EXISTS (SELECT 1 FROM TagTable AS SubTable WHERE SubTable.Tag = :{0})", + QueryType.INSERT_NEW_TAG.getParamName(0)); case GET_LAST_MESSAGE_ENTRY: return "SELECT MAX(MsgTable.EntryNum) FROM MsgTable"; diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/MySQLQueryProvider.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/MySQLQueryProvider.java index adf96a4..ad3df68 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/MySQLQueryProvider.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/MySQLQueryProvider.java @@ -1,7 +1,5 @@ package meerkat.bulletinboard.sqlserver; -import com.mysql.jdbc.jdbc2.optional.MysqlDataSource; -import meerkat.bulletinboard.BulletinBoardConstants; import meerkat.bulletinboard.sqlserver.BulletinBoardSQLServer.SQLQueryProvider; import meerkat.protobuf.BulletinBoardAPI.FilterType; import org.apache.commons.dbcp2.BasicDataSource; @@ -81,6 +79,16 @@ public class MySQLQueryProvider implements SQLQueryProvider { QueryType.INSERT_MSG.getParamName(1), QueryType.INSERT_MSG.getParamName(2)); + case DELETE_MSG_BY_ENTRY: + return MessageFormat.format( + "DELETE IGNORE FROM MsgTable WHERE EntryNum = :{0}", + QueryType.DELETE_MSG_BY_ENTRY.getParamName(0)); + + case DELETE_MSG_BY_ID: + return MessageFormat.format( + "DELETE IGNORE FROM MsgTable WHERE MsgId = :{0}", + QueryType.DELETE_MSG_BY_ID.getParamName(0)); + case INSERT_NEW_TAG: return MessageFormat.format( "INSERT IGNORE INTO TagTable(Tag) VALUES (:{0})", diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java index 245eddf..9d7d5b8 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java @@ -47,12 +47,12 @@ public interface BulletinBoardClient { /** * Create a SyncQuery to test against that corresponds with the current server state for a specific filter list * Should only be called on instances for which the actual server contacted is known (i.e. there is only one server) - * @param GenerateSyncQueryParams defines the required information needed to generate the query + * @param generateSyncQueryParams defines the required information needed to generate the query * These are represented as fractions of the total number of relevant messages * @return The generated SyncQuery * @throws CommunicationException when no DB can be contacted */ - SyncQuery generateSyncQuery(GenerateSyncQueryParams GenerateSyncQueryParams) throws CommunicationException; + SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException; /** * Closes all connections, if any. diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardMessageDeleter.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardMessageDeleter.java new file mode 100644 index 0000000..6025719 --- /dev/null +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardMessageDeleter.java @@ -0,0 +1,30 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.FutureCallback; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI.*; + +/** + * Created by Arbel Deutsch Peled on 13-Apr-16. + * This interface is meant to extend a BulletinBoardClient interface/class + * It provides it with the ability to delete messages from the Server + * This assumes the Server implements the {@link DeletableBulletinBoardServer} + */ +public interface BulletinBoardMessageDeleter { + + /** + * Deletes a message from a Bulletin Board Server + * @param msgID is the ID of the message to delete + * @param callback handles the result of the operation + */ + public void deleteMessage(MessageID msgID, FutureCallback callback); + + /** + * Deletes a message from the Bulletin Board + * Logs this action + * @param entryNum is the serial entry number of the message to delete + * @param callback handles the result of the operation + */ + public void deleteMessage(long entryNum, FutureCallback callback); + +} diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/DeletableBulletinBoardServer.java b/meerkat-common/src/main/java/meerkat/bulletinboard/DeletableBulletinBoardServer.java new file mode 100644 index 0000000..03e0c3f --- /dev/null +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/DeletableBulletinBoardServer.java @@ -0,0 +1,29 @@ +package meerkat.bulletinboard; + +import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI.*; + +/** + * Created by Arbel Deutsch Peled on 13-Apr-16. + */ +public interface DeletableBulletinBoardServer extends BulletinBoardServer { + + /** + * Deletes a message from the Bulletin Board + * Logs this action + * @param msgID is the ID of the message to delete + * @return a BoolMsg containing the value TRUE if a message was deleted, FALSE if the message does not exist + * @throws CommunicationException in case of an error + */ + public BoolMsg deleteMessage(MessageID msgID) throws CommunicationException; + + /** + * Deletes a message from the Bulletin Board + * Logs this action + * @param entryNum is the serial entry number of the message to delete + * @return a BoolMsg containing the value TRUE if a message was deleted, FALSE if the message does not exist + * @throws CommunicationException in case of an error + */ + public BoolMsg deleteMessage(long entryNum) throws CommunicationException; + +} diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/DeletableSubscriptionBulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/DeletableSubscriptionBulletinBoardClient.java new file mode 100644 index 0000000..acc29fe --- /dev/null +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/DeletableSubscriptionBulletinBoardClient.java @@ -0,0 +1,7 @@ +package meerkat.bulletinboard; + +/** + * Created by Arbel Deutsch Peled on 13-Apr-16. + */ +public interface DeletableSubscriptionBulletinBoardClient extends SubscriptionBulletinBoardClient, BulletinBoardMessageDeleter { +} diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/SubscriptionAsyncBulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/SubscriptionAsyncBulletinBoardClient.java deleted file mode 100644 index b07e655..0000000 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/SubscriptionAsyncBulletinBoardClient.java +++ /dev/null @@ -1,7 +0,0 @@ -package meerkat.bulletinboard; - -/** - * Created by Arbel Deutsch Peled on 03-Mar-16. - */ -public interface SubscriptionAsyncBulletinBoardClient extends AsyncBulletinBoardClient, BulletinBoardSubscriber { -} diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/SubscriptionBulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/SubscriptionBulletinBoardClient.java new file mode 100644 index 0000000..5577bac --- /dev/null +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/SubscriptionBulletinBoardClient.java @@ -0,0 +1,7 @@ +package meerkat.bulletinboard; + +/** + * Created by Arbel Deutsch Peled on 03-Mar-16. + */ +public interface SubscriptionBulletinBoardClient extends AsyncBulletinBoardClient, BulletinBoardSubscriber { +}