From 9ed728fca7637473b5d825d35e2fbccdde55e69c Mon Sep 17 00:00:00 2001 From: "arbel.peled" Date: Thu, 14 Apr 2016 09:20:11 +0300 Subject: [PATCH] Added message counting ability to the server (but not to the client) Added synchronous CompleteBatch read by the client Started implementing the synchronizer Added support for null callbacks --- .../CachedBulletinBoardClient.java | 48 +++-- .../LocalBulletinBoardClient.java | 82 +++++++- .../bulletinboard/MultiServerWorker.java | 6 +- .../SimpleBulletinBoardClient.java | 65 ++++-- .../SimpleBulletinBoardSynchronizer.java | 188 ++++++++++++++++++ .../SingleServerBulletinBoardClient.java | 42 ++-- .../ThreadedBulletinBoardSubscriber.java | 13 +- .../sqlserver/BulletinBoardSQLServer.java | 42 ++-- .../webapp/BulletinBoardWebApp.java | 12 +- .../bulletinboard/BulletinBoardClient.java | 10 +- .../bulletinboard/BulletinBoardConstants.java | 1 + .../bulletinboard/BulletinBoardServer.java | 10 +- .../BulletinBoardSynchronizer.java | 74 ++++++- .../meerkat/bulletinboard/CompleteBatch.java | 33 ++- 14 files changed, 532 insertions(+), 94 deletions(-) create mode 100644 bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java index 6aac297..c8b26c7 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java @@ -84,7 +84,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien @Override public void onFailure(Throwable t) { - callback.onFailure(t); + if (callback != null) + callback.onFailure(t); } }); @@ -101,7 +102,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien @Override public void onFailure(Throwable t) { - callback.onFailure(t); + if (callback != null) + callback.onFailure(t); } }); @@ -118,7 +120,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien @Override public void onFailure(Throwable t) { - callback.onFailure(t); + if (callback != null) + callback.onFailure(t); } }); @@ -136,7 +139,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien @Override public void onFailure(Throwable t) { - callback.onFailure(t); + if (callback != null) + callback.onFailure(t); } }); @@ -153,7 +157,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien @Override public void onFailure(Throwable t) { - callback.onFailure(t); + if (callback != null) + callback.onFailure(t); } }); @@ -171,7 +176,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien @Override public void onFailure(Throwable t) { - callback.onFailure(t); + if (callback != null) + callback.onFailure(t); } }); @@ -188,7 +194,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien @Override public void onFailure(Throwable t) { - callback.onFailure(t); + if (callback != null) + callback.onFailure(t); } }); @@ -205,7 +212,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien @Override public void onFailure(Throwable t) { - callback.onFailure(t); + if (callback != null) + callback.onFailure(t); } }); @@ -233,7 +241,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien localClient.readBatch(batchSpecificationMessage, new FutureCallback() { @Override public void onSuccess(CompleteBatch result) { - callback.onSuccess(result); // Read from local client was successful + if (callback != null) + callback.onSuccess(result); // Read from local client was successful } @Override @@ -255,7 +264,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien public void onFailure(Throwable t) {} }); - callback.onSuccess(result); + if (callback != null) + callback.onSuccess(result); } @@ -263,7 +273,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien public void onFailure(Throwable t) { // Read from remote was unsuccessful: report error - callback.onFailure(t); + if (callback != null) + callback.onFailure(t); } @@ -301,11 +312,16 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien } @Override - public List readMessages(MessageFilterList filterList) { + public List readMessages(MessageFilterList filterList) throws CommunicationException { subscriber.subscribe(filterList, new SubscriptionStoreCallback()); return localClient.readMessages(filterList); } + @Override + public CompleteBatch readBatch(BatchSpecificationMessage batchSpecificationMessage) throws CommunicationException { + return localClient.readBatch(batchSpecificationMessage); + } + @Override public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException { return localClient.generateSyncQuery(generateSyncQueryParams); @@ -327,12 +343,4 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien subscriber.subscribe(filterList, startEntry, callback); } - public int syncStatus(){ - return 0; - } - - public void reSync(){ - - } - } \ No newline at end of file diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java index 1f56690..30ae462 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java @@ -291,6 +291,31 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo } + private class BatchDataReader implements Callable> { + + private final BatchSpecificationMessage batchSpecificationMessage; + + public BatchDataReader(BatchSpecificationMessage batchSpecificationMessage) { + this.batchSpecificationMessage = batchSpecificationMessage; + } + + @Override + public List call() throws Exception { + + ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); + MessageOutputStream outputStream = new MessageOutputStream<>(byteOutputStream); + server.readBatch(batchSpecificationMessage, outputStream); + + MessageInputStream inputStream = + MessageInputStreamFactory.createMessageInputStream( + new ByteArrayInputStream(byteOutputStream.toByteArray()), + BatchData.class); + + return inputStream.asList(); + + } + } + @Override public void readMessages(MessageFilterList filterList, FutureCallback> callback) { Futures.addCallback(executorService.submit(new MessageReader(filterList)), callback); @@ -310,7 +335,8 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo public void onSuccess(List result) { // Report new messages to user - callback.onSuccess(result); + if (callback != null) + callback.onSuccess(result); MessageFilterList.Builder filterBuilder = filterList.toBuilder(); @@ -339,7 +365,8 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo public void onFailure(Throwable t) { // Notify caller about failure and terminate subscription - callback.onFailure(t); + if (callback != null) + callback.onFailure(t); } } @@ -503,7 +530,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo } @Override - public List readMessages(MessageFilterList filterList) { + public List readMessages(MessageFilterList filterList) throws CommunicationException{ try { @@ -511,7 +538,40 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo return reader.call(); } catch (Exception e){ - return null; + throw new CommunicationException("Error reading from server"); + } + + } + + @Override + public CompleteBatch readBatch(BatchSpecificationMessage batchSpecificationMessage) throws CommunicationException { + + MessageFilterList filterList = MessageFilterList.newBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.TAG) + .setTag(BulletinBoardConstants.BATCH_TAG) + .build()) + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.TAG) + .setTag(BulletinBoardConstants.BATCH_ID_TAG_PREFIX + batchSpecificationMessage.getBatchId()) + .build()) + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.SIGNER_ID) + .setId(batchSpecificationMessage.getSignerId()) + .build()) + .build(); + + BulletinBoardMessage batchMessage = readMessages(filterList).get(0); + + BatchDataReader batchDataReader = new BatchDataReader(batchSpecificationMessage); + + try { + + List batchDataList = batchDataReader.call(); + return new CompleteBatch(batchMessage, batchDataList); + + } catch (Exception e) { + throw new CommunicationException("Error reading batch"); } } @@ -525,9 +585,12 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo public void deleteMessage(MessageID msgID, FutureCallback callback) { try { - callback.onSuccess(server.deleteMessage(msgID).getValue()); + Boolean deleted = server.deleteMessage(msgID).getValue(); + if (callback != null) + callback.onSuccess(deleted); } catch (CommunicationException e) { - callback.onFailure(e); + if (callback != null) + callback.onFailure(e); } } @@ -536,9 +599,12 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo public void deleteMessage(long entryNum, FutureCallback callback) { try { - callback.onSuccess(server.deleteMessage(entryNum).getValue()); + Boolean deleted = server.deleteMessage(entryNum).getValue(); + if (callback != null) + callback.onSuccess(deleted); } catch (CommunicationException e) { - callback.onFailure(e); + if (callback != null) + callback.onFailure(e); } } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/MultiServerWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/MultiServerWorker.java index 7347f47..60327fa 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/MultiServerWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/MultiServerWorker.java @@ -74,7 +74,8 @@ public abstract class MultiServerWorker extends BulletinClientWorker extends BulletinClientWorker readMessages(MessageFilterList filterList) { - WebTarget webTarget; - Response response; - BulletinBoardMessageList messageList; - // Replace null filter list with blank one. if (filterList == null){ - filterList = MessageFilterList.newBuilder().build(); + filterList = MessageFilterList.getDefaultInstance(); } for (String db : meerkatDBs) { try { - webTarget = client.target(db).path(BULLETIN_BOARD_SERVER_PATH).path(READ_MESSAGES_PATH); - response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(filterList, Constants.MEDIATYPE_PROTOBUF)); + SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(db, filterList, 0); - messageList = response.readEntity(BulletinBoardMessageList.class); + List result = worker.call(); - if (messageList != null){ - return messageList.getMessageList(); - } + return result; - } catch (Exception e) {} + } catch (Exception ignored) {} + } + return null; + + } + + @Override + public CompleteBatch readBatch(BatchSpecificationMessage batchSpecificationMessage) throws CommunicationException { + + // Create job with no retries for retrieval of the Bulletin Board Message that defines the batch + + MessageFilterList filterList = MessageFilterList.newBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.TAG) + .setTag(BulletinBoardConstants.BATCH_TAG) + .build()) + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.TAG) + .setTag(BulletinBoardConstants.BATCH_ID_TAG_PREFIX + batchSpecificationMessage.getBatchId()) + .build()) + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.SIGNER_ID) + .setId(batchSpecificationMessage.getSignerId()) + .build()) + .build(); + + for (String db : meerkatDBs) { + + try { + SingleServerReadMessagesWorker messagesWorker = new SingleServerReadMessagesWorker(db, filterList, 0); + + List messages = messagesWorker.call(); + + if (messages == null || messages.size() < 1) + continue; + + BulletinBoardMessage batchMessage = messages.get(0); + + SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(db, batchSpecificationMessage, 0); + + List batchDataList = batchWorker.call(); + + CompleteBatch result = new CompleteBatch(batchMessage, batchDataList); + + return result; + + } catch (Exception ignored) {} } return null; diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java new file mode 100644 index 0000000..1eed944 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java @@ -0,0 +1,188 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.protobuf.ByteString; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.util.BulletinBoardUtils; + +import java.util.LinkedList; +import java.util.List; + +/** + * Created by Arbel on 13/04/2016. + * Simple, straightforward implementation of the {@link BulletinBoardSynchronizer} interface + */ +public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronizer { + + private DeletableSubscriptionBulletinBoardClient localClient; + private AsyncBulletinBoardClient remoteClient; + + private volatile SyncStatus syncStatus; + + private List> messageCountCallbacks; + private List> syncStatusCallbacks; + + private static final MessageFilterList EMPTY_FILTER = MessageFilterList.getDefaultInstance(); + private static final int SLEEP_INTERVAL = 10000; // 10 Seconds + + private class SyncCallback implements FutureCallback> { + + @Override + public void onSuccess(List result) { + + SyncStatus newStatus = SyncStatus.PENDING; + + if (result.size() == 0) { + newStatus = SyncStatus.SYNCHRONIZED; + } + + else{ // Upload messages + + for (BulletinBoardMessage message : result){ + + if (message.getMsg().getTagList().contains(BulletinBoardConstants.BATCH_TAG)){ + + // This is a batch message: need to upload batch data as well as the message itself + ByteString signerId = message.getSig(0).getSignerId(); + long batchID = Long.parseLong(BulletinBoardUtils.findTagWithPrefix(message, BulletinBoardConstants.BATCH_ID_TAG_PREFIX)); + + BatchSpecificationMessage batchSpecificationMessage = BatchSpecificationMessage.newBuilder().build(); + + localClient.readBatch(batchSpecificationMessage, null); + + } + else{ + + // This is a regular message: post it + try { + remoteClient.postMessage(message); + } catch (CommunicationException e) { + newStatus = SyncStatus.SERVER_ERROR; + } + + } + } + + } + + updateSyncStatus(newStatus); + + } + + @Override + public void onFailure(Throwable t) { + + updateSyncStatus(SyncStatus.SERVER_ERROR); + + } + + } + + public SimpleBulletinBoardSynchronizer() { + this.syncStatus = SyncStatus.STOPPED; + } + + private synchronized void updateSyncStatus(SyncStatus newStatus) { + + if (newStatus != syncStatus){ + + syncStatus = newStatus; + + for (FutureCallback callback : syncStatusCallbacks){ + if (callback != null) + callback.onSuccess(syncStatus); + } + + } + + } + + @Override + public void init(DeletableSubscriptionBulletinBoardClient localClient, AsyncBulletinBoardClient remoteClient) { + + updateSyncStatus(SyncStatus.STOPPED); + + this.localClient = localClient; + this.remoteClient = remoteClient; + + messageCountCallbacks = new LinkedList<>(); + syncStatusCallbacks = new LinkedList<>(); + + } + + @Override + public SyncStatus getSyncStatus() { + return syncStatus; + } + + @Override + public void subscribeToSyncStatus(FutureCallback callback) { + syncStatusCallbacks.add(callback); + } + + @Override + public List getRemainingMessages() throws CommunicationException{ + return localClient.readMessages(EMPTY_FILTER); + } + + @Override + public void getRemainingMessages(FutureCallback> callback) { + localClient.readMessages(EMPTY_FILTER, callback); + } + + @Override + public long getRemainingMessagesCount() throws CommunicationException { + return localClient.readMessages(EMPTY_FILTER).size(); + } + + @Override + public void subscribeToRemainingMessagesCount(FutureCallback callback) { + messageCountCallbacks.add(callback); + } + + @Override + public void run() { + + if (syncStatus != SyncStatus.STOPPED) { + + updateSyncStatus(SyncStatus.PENDING); + SyncCallback callback = new SyncCallback(); + + while (syncStatus != SyncStatus.STOPPED) { + + try { + + do { + localClient.readMessages(EMPTY_FILTER, callback); + } while (syncStatus == SyncStatus.PENDING); + + synchronized (this) { + this.wait(SLEEP_INTERVAL); + } + + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + + } + + } + + @Override + public void nudge() { + synchronized (this) { + this.notify(); + } + } + + @Override + public void stop() { + + updateSyncStatus(SyncStatus.STOPPED); + + } + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java index f12430d..0a43562 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java @@ -97,7 +97,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i @Override public void onSuccess(T result) { - futureCallback.onSuccess(result); + if (futureCallback != null) + futureCallback.onSuccess(result); } @Override @@ -115,7 +116,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i scheduleWorker(worker, this); } else { // No more retries: notify caller about failure - futureCallback.onFailure(t); + if (futureCallback != null) + futureCallback.onFailure(t); } } @@ -150,7 +152,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } if (batchDataRemaining.decrementAndGet() == 0){ - callback.onSuccess(this.aggregatedResult.get()); + if (callback != null) + callback.onSuccess(this.aggregatedResult.get()); } } @@ -158,7 +161,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i public void onFailure(Throwable t) { // Notify caller about failure - callback.onFailure(t); + if (callback != null) + callback.onFailure(t); } } @@ -195,26 +199,16 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i if (remainingQueries.decrementAndGet() == 0){ - String batchIdStr = BulletinBoardUtils.findTagWithPrefix(batchMessage, BulletinBoardConstants.BATCH_ID_TAG_PREFIX); - - if (batchIdStr == null){ - callback.onFailure(new CommunicationException("Server returned invalid message with no Batch ID tag")); - } - - BeginBatchMessage beginBatchMessage = - BeginBatchMessage.newBuilder() - .setSignerId(batchMessage.getSig(0).getSignerId()) - .setBatchId(Integer.parseInt(batchIdStr)) - .addAllTag(BulletinBoardUtils.removePrefixTags(batchMessage, Arrays.asList(prefixes))) - .build(); - callback.onSuccess(new CompleteBatch(beginBatchMessage, batchDataList, batchMessage.getSig(0))); + if (callback != null) + callback.onSuccess(new CompleteBatch(batchMessage, batchDataList)); } } protected void fail(Throwable t) { if (failed.compareAndSet(false, true)) { - callback.onFailure(t); + if (callback != null) + callback.onFailure(t); } } @@ -289,7 +283,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i public void onSuccess(List result) { // Report new messages to user - callback.onSuccess(result); + if (callback != null) + callback.onSuccess(result); // Remove last filter from list (MIN_ENTRY one) filterBuilder.removeFilter(filterBuilder.getFilterCount() - 1); @@ -315,7 +310,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i fail(); // Notify caller about failure and terminate subscription - callback.onFailure(t); + if (callback != null) + callback.onFailure(t); } } @@ -397,7 +393,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i @Override public void onFailure(Throwable t) { - callback.onFailure(t); + if (callback != null) + callback.onFailure(t); } } @@ -425,7 +422,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i @Override public void onFailure(Throwable t) { - callback.onFailure(t); + if (callback != null) + callback.onFailure(t); } } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java index ca8ef84..9568255 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java @@ -29,6 +29,8 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber private AtomicBoolean isSyncInProgress; private Semaphore rescheduleSemaphore; + private AtomicBoolean stopped; + private static final Float[] BREAKPOINTS = {0.5f, 0.75f, 0.9f, 0.95f, 0.99f, 0.999f}; public ThreadedBulletinBoardSubscriber(Collection clients, BulletinBoardClient localClient) { @@ -44,6 +46,8 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber isSyncInProgress = new AtomicBoolean(false); rescheduleSemaphore = new Semaphore(1); + stopped = new AtomicBoolean(false); + } /** @@ -131,7 +135,8 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber //TODO: log - callback.onFailure(e); // Hard error: Cannot guarantee subscription safety + if (callback != null) + callback.onFailure(e); // Hard error: Cannot guarantee subscription safety } @@ -217,7 +222,8 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber public void onSuccess(List result) { // Propagate result to caller - callback.onSuccess(result); + if (callback != null) + callback.onSuccess(result); // Renew subscription @@ -249,7 +255,8 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber public void onSuccess(List result) { // Propagate result to caller - callback.onSuccess(result); + if (callback != null) + callback.onSuccess(result); } diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java index cebeb8e..6b2ef4f 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java @@ -679,6 +679,30 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{ } + @Override + public IntMsg getMessageCount(MessageFilterList filterList) throws CommunicationException { + + BulletinBoardMessageList.Builder resultListBuilder = BulletinBoardMessageList.newBuilder(); + + // SQL length is roughly 50 characters per filter + 50 for the query itself + StringBuilder sqlBuilder = new StringBuilder(50 * (filterList.getFilterCount() + 1)); + + // Check if Tag/Signature tables are required for filtering purposes + + sqlBuilder.append(sqlQueryProvider.getSQLString(QueryType.COUNT_MESSAGES)); + + // Get conditions + + SQLAndParameters sqlAndParameters = getSQLFromFilters(filterList); + sqlBuilder.append(sqlAndParameters.sql); + + // Run query and stream the output using a MessageCallbackHandler + + List count = jdbcTemplate.query(sqlBuilder.toString(), sqlAndParameters.parameters, new LongMapper()); + return IntMsg.newBuilder().setValue(count.get(0).intValue()).build(); + + } + /** * This method returns a string representation of the tag associated with a batch ID @@ -713,23 +737,9 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{ .build()) .build(); - // SQL length is roughly 50 characters per filter + 50 for the query itself - StringBuilder sqlBuilder = new StringBuilder(50 * (filterList.getFilterCount() + 1)); + int count = getMessageCount(filterList).getValue(); - // Check if Tag/Signature tables are required for filtering purposes - - sqlBuilder.append(sqlQueryProvider.getSQLString(QueryType.COUNT_MESSAGES)); - - // Get conditions - - SQLAndParameters sqlAndParameters = getSQLFromFilters(filterList); - sqlBuilder.append(sqlAndParameters.sql); - - // Run query and stream the output using a MessageCallbackHandler - - List count = jdbcTemplate.query(sqlBuilder.toString(), sqlAndParameters.parameters, new LongMapper()); - - return (count.size() > 0) && (count.get(0) > 0); + return count > 0; } diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java index 7c0f7fa..74b5f2a 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java @@ -15,14 +15,12 @@ import meerkat.bulletinboard.sqlserver.MySQLQueryProvider; import meerkat.bulletinboard.sqlserver.SQLiteQueryProvider; import meerkat.comm.CommunicationException; import meerkat.comm.MessageOutputStream; -import meerkat.protobuf.BulletinBoardAPI; import meerkat.protobuf.BulletinBoardAPI.*; import static meerkat.bulletinboard.BulletinBoardConstants.*; import static meerkat.rest.Constants.*; import java.io.IOException; import java.io.OutputStream; -import java.util.Collection; /** * An implementation of the BulletinBoardServer which functions as a WebApp @@ -99,6 +97,16 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL bulletinBoard.readMessages(filterList, out); } + @Path(COUNT_MESSAGES_PATH) + @POST + @Consumes(MEDIATYPE_PROTOBUF) + @Produces(MEDIATYPE_PROTOBUF) + @Override + public IntMsg getMessageCount(MessageFilterList filterList) throws CommunicationException { + init(); + return bulletinBoard.getMessageCount(filterList); + } + @Path(READ_MESSAGES_PATH) @POST diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java index 9d7d5b8..91e577d 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java @@ -42,7 +42,15 @@ public interface BulletinBoardClient { * @param filterList return only messages that match the filters (null means no filtering) * @return the list of messages */ - List readMessages(MessageFilterList filterList); + List readMessages(MessageFilterList filterList) throws CommunicationException; + + /** + * Read a given batch message from the bulletin board + * @param batchSpecificationMessage contains the data required to specify a single batch instance + * @return the complete batch + * @throws CommunicationException + */ + CompleteBatch readBatch(BatchSpecificationMessage batchSpecificationMessage) throws CommunicationException; /** * Create a SyncQuery to test against that corresponds with the current server state for a specific filter list diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardConstants.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardConstants.java index 9ddee90..f2fb3a9 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardConstants.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardConstants.java @@ -10,6 +10,7 @@ public interface BulletinBoardConstants { public static final String BULLETIN_BOARD_SERVER_PATH = "/bbserver"; public static final String GENERATE_SYNC_QUERY_PATH = "/generatesyncquery"; public static final String READ_MESSAGES_PATH = "/readmessages"; + public static final String COUNT_MESSAGES_PATH = "/countmessages"; public static final String READ_BATCH_PATH = "/readbatch"; public static final String POST_MESSAGE_PATH = "/postmessage"; public static final String BEGIN_BATCH_PATH = "/beginbatch"; diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java index e458dbc..e26c54a 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java @@ -32,13 +32,21 @@ public interface BulletinBoardServer{ public BoolMsg postMessage(BulletinBoardMessage msg) throws CommunicationException; /** - * Read all messages posted matching the given filter + * Read all posted messages matching the given filters * @param filterList return only messages that match the filters (empty list or null means no filtering) * @param out is an output stream into which the matching messages are written * @throws CommunicationException on DB connection error */ public void readMessages(MessageFilterList filterList, MessageOutputStream out) throws CommunicationException; + /** + * Return the number of posted messages matching the given filters + * @param filterList count only messages that match the filters (empty list or null means no filtering) + * @return an IntMsg containing the number of messages that match the filter + * @throws CommunicationException on DB connection error + */ + public IntMsg getMessageCount(MessageFilterList filterList) throws CommunicationException; + /** * Informs server about a new batch message * @param message contains the required data about the new batch diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSynchronizer.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSynchronizer.java index 4b07225..569d4ef 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSynchronizer.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSynchronizer.java @@ -1,22 +1,84 @@ package meerkat.bulletinboard; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI.*; + +import com.google.common.util.concurrent.FutureCallback; + +import java.util.List; + /** * Created by Arbel Deutsch Peled on 08-Mar-16. * This interface defines the behaviour of a bulletin board synchronizer * This is used to make sure that data in a specific instance of a bulletin board server is duplicated to a sufficient percentage of the other servers */ -public interface BulletinBoardSynchronizer extends Runnable{ +public interface BulletinBoardSynchronizer extends Runnable { + + public enum SyncStatus{ + SYNCHRONIZED, // No more messages to upload + PENDING, // Synchronizer is uploading data + SERVER_ERROR, // Synchronizer encountered an error while uploading + STOPPED // Stopped/Not started by user + } /** - * - * @param localClient is a client for the local DB instance - * @param remoteClient is a client for the remote DBs - * @param minRedundancy + * Initializes the synchronizer with the required data to function properly + * @param localClient is a client for the temporary local storage server which contains only data to be uploaded + * @param remoteClient is a client for the remote servers into which the data needs to be uploaded */ - public void init(BulletinBoardClient localClient, AsyncBulletinBoardClient remoteClient, float minRedundancy); + public void init(DeletableSubscriptionBulletinBoardClient localClient, AsyncBulletinBoardClient remoteClient); + /** + * Returns the current server synchronization status + * @return the current synchronization status + */ + public SyncStatus getSyncStatus(); + + /** + * Creates a subscription to sync status changes + * @param callback is the handler for any status changes + */ + public void subscribeToSyncStatus(FutureCallback callback); + + /** + * Returns the messages which have not yet been synchronized + * @return the list of messages remaining to be synchronized + */ + public List getRemainingMessages() throws CommunicationException; + + /** + * Asynchronously returns the messages which have not yet been synchronized + * @param callback is the handler for the list of messages + */ + public void getRemainingMessages(FutureCallback> callback); + + /** + * Returns the current number of unsynchronized messages + * @return the current synchronization status + */ + public long getRemainingMessagesCount() throws CommunicationException; + + /** + * Creates a subscription to changes in the number of unsynchronized messages + * @param callback is the handler for any status changes + */ + public void subscribeToRemainingMessagesCount(FutureCallback callback); + + /** + * Starts the synchronization + */ @Override public void run(); + /** + * Lets the Synchronizer know that there is new data to be uploaded + * This is used to reduce the latency between local data-writes and uploads to the remote servers + */ + public void nudge(); + + /** + * Stops the synchronization + */ + public void stop(); } diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/CompleteBatch.java b/meerkat-common/src/main/java/meerkat/bulletinboard/CompleteBatch.java index 649fd8b..89ae80a 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/CompleteBatch.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/CompleteBatch.java @@ -3,8 +3,9 @@ package meerkat.bulletinboard; import com.google.protobuf.Timestamp; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Crypto.*; -import meerkat.util.BulletinBoardMessageComparator; +import meerkat.util.BulletinBoardUtils; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; @@ -49,6 +50,36 @@ public class CompleteBatch { this.timestamp = timestamp; } + /** + * Combines the actual Bulletin board representation of a batch into a CompleteBatch object + * @param batchMessage is the BulletinBoard message that specifies the batch + * @param batchDataList is the (ordered) list of batch data + */ + public CompleteBatch(BulletinBoardMessage batchMessage, List batchDataList) throws IllegalArgumentException{ + + final String[] PREFIXES = { + BulletinBoardConstants.BATCH_ID_TAG_PREFIX, + BulletinBoardConstants.BATCH_TAG}; + + String batchIdStr = BulletinBoardUtils.findTagWithPrefix(batchMessage, BulletinBoardConstants.BATCH_ID_TAG_PREFIX); + + if (batchIdStr == null){ + throw new IllegalArgumentException(""); + } + + this.beginBatchMessage = + BeginBatchMessage.newBuilder() + .setSignerId(batchMessage.getSig(0).getSignerId()) + .setBatchId(Integer.parseInt(batchIdStr)) + .addAllTag(BulletinBoardUtils.removePrefixTags(batchMessage, Arrays.asList(PREFIXES))) + .build(); + + this.batchDataList = batchDataList; + this.signature = batchMessage.getSig(0); + this.timestamp = batchMessage.getMsg().getTimestamp(); + + } + public BeginBatchMessage getBeginBatchMessage() { return beginBatchMessage; }