From 4c33e923b20c4ff69fef136be1172fafbd0cf5c7 Mon Sep 17 00:00:00 2001 From: "arbel.peled" Date: Sat, 16 Apr 2016 19:50:09 +0300 Subject: [PATCH] Implemented Synchronizer and Cached Client Not tested yet --- .../CachedBulletinBoardClient.java | 23 +++++-- .../LocalBulletinBoardClient.java | 47 +++++++++------ .../SimpleBulletinBoardClient.java | 52 ++++++++++++++-- .../SimpleBulletinBoardSynchronizer.java | 60 +++++++++++++++---- .../SingleServerBulletinBoardClient.java | 15 ++--- .../bulletinboard/BulletinBoardClient.java | 8 +++ .../BulletinBoardMessageDeleter.java | 23 ++++++- 7 files changed, 177 insertions(+), 51 deletions(-) diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java index c8b26c7..ac61114 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java @@ -14,7 +14,8 @@ import java.util.List; * It provides asynchronous access to several remote servers, as well as a local cache * Read operations are performed on the local server * Batch reads are performed on the local server and, if they fail, also on the remote servers - * Write operations are performed first on the local server and then on the remotes + * Write operations are performed on the local server + * A Synchronizer is employed in order to keep the remote server up to date * After any read is carried out, a subscription is made for the specific query to make sure the local DB will be updated * The database also employs a synchronizer which makes sure local data is sent to the remote servers */ @@ -23,6 +24,9 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien private final AsyncBulletinBoardClient localClient; private AsyncBulletinBoardClient remoteClient; private BulletinBoardSubscriber subscriber; + private BulletinBoardSynchronizer synchronizer; + + private Thread syncThread; private class SubscriptionStoreCallback implements FutureCallback> { @@ -62,15 +66,21 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien * @param localClient is a Client for the local instance * @param remoteClient is a Client for the remote instance(s); Should have endless retries for post operations * @param subscriber is a subscription service to the remote instance(s) + * @param queue is a client for a local deletable server to be used as a queue for not-yet-uploaded messages */ public CachedBulletinBoardClient(AsyncBulletinBoardClient localClient, AsyncBulletinBoardClient remoteClient, - BulletinBoardSubscriber subscriber) { + BulletinBoardSubscriber subscriber, + DeletableSubscriptionBulletinBoardClient queue) { this.localClient = localClient; this.remoteClient = remoteClient; this.subscriber = subscriber; + this.synchronizer = new SimpleBulletinBoardSynchronizer(); + synchronizer.init(queue, remoteClient); + syncThread = new Thread(synchronizer); + syncThread.start(); } @Override @@ -302,8 +312,12 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien @Override public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException { - localClient.postMessage(msg); - return remoteClient.postMessage(msg); + return localClient.postMessage(msg); + } + + @Override + public MessageID postBatch(CompleteBatch completeBatch) throws CommunicationException { + return localClient.postBatch(completeBatch); } @Override @@ -331,6 +345,7 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien public void close() { localClient.close(); remoteClient.close(); + synchronizer.stop(); } @Override diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java index 30ae462..109d5ed 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java @@ -57,7 +57,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo @Override - public Boolean call() throws Exception { + public Boolean call() throws CommunicationException { return server.postMessage(msg).getValue(); } @@ -83,25 +83,21 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo @Override - public Boolean call() throws Exception { + public Boolean call() throws CommunicationException { - if (!server.beginBatch(completeBatch.getBeginBatchMessage()).getValue()) - return false; + server.beginBatch(completeBatch.getBeginBatchMessage()); + + BatchMessage.Builder builder = BatchMessage.newBuilder() + .setSignerId(completeBatch.getSignature().getSignerId()) + .setBatchId(completeBatch.getBeginBatchMessage().getBatchId()); int i=0; for (BatchData data : completeBatch.getBatchDataList()){ - BatchMessage message = BatchMessage.newBuilder() - .setSignerId(completeBatch.getSignature().getSignerId()) - .setBatchId(completeBatch.getBeginBatchMessage().getBatchId()) - .setSerialNum(i) - .setData(data) - .build(); - - if (!server.postBatchMessage(message).getValue()) - return false; + server.postBatchMessage(builder.setSerialNum(i).setData(data).build()); i++; + } return server.closeBatchMessage(completeBatch.getCloseBatchMessage()).getValue(); @@ -501,17 +497,22 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo @Override public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException { - try { - MessagePoster poster = new MessagePoster(msg); poster.call(); digest.update(msg); return digest.digestAsMessageID(); - } catch (Exception e) { - return null; - } + } + + @Override + public MessageID postBatch(CompleteBatch completeBatch) throws CommunicationException { + + CompleteBatchPoster poster = new CompleteBatchPoster(completeBatch); + poster.call(); + + digest.update(completeBatch); + return digest.digestAsMessageID(); } @@ -609,6 +610,16 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo } + @Override + public boolean deleteMessage(MessageID msgID) throws CommunicationException { + return server.deleteMessage(msgID).getValue(); + } + + @Override + public boolean deleteMessage(long entryNum) throws CommunicationException { + return server.deleteMessage(entryNum).getValue(); + } + @Override public void close() { try { diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java index 9f6eae7..28252ae 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java @@ -2,8 +2,7 @@ package meerkat.bulletinboard; import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; -import meerkat.bulletinboard.workers.singleserver.SingleServerReadBatchWorker; -import meerkat.bulletinboard.workers.singleserver.SingleServerReadMessagesWorker; +import meerkat.bulletinboard.workers.singleserver.*; import meerkat.comm.CommunicationException; import meerkat.comm.MessageInputStream; import meerkat.crypto.Digest; @@ -38,7 +37,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{ protected Client client; - protected Digest digest; + protected BatchDigest digest; /** * Stores database locations and initializes the web Client @@ -53,7 +52,8 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{ client.register(ProtobufMessageBodyReader.class); client.register(ProtobufMessageBodyWriter.class); - digest = new SHA256Digest(); + // Wrap the Digest into a BatchDigest + digest = new GenericBatchDigest(new SHA256Digest()); } @@ -92,6 +92,50 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{ return MessageID.newBuilder().setID(ByteString.copyFrom(digest.digest())).build(); } + @Override + public MessageID postBatch(CompleteBatch completeBatch) throws CommunicationException { + + int pos = 0; + ByteString signerID = completeBatch.getSignature().getSignerId(); + int batchID = completeBatch.getBeginBatchMessage().getBatchId(); + + // Post message to all databases + try { + for (String db : meerkatDBs) { + + SingleServerBeginBatchWorker beginBatchWorker = new SingleServerBeginBatchWorker(db, completeBatch.getBeginBatchMessage(), 0); + + beginBatchWorker.call(); + + BatchMessage.Builder builder = BatchMessage.newBuilder().setSignerId(signerID).setBatchId(batchID); + + for (BatchData batchData : completeBatch.getBatchDataList()) { + + SingleServerPostBatchWorker postBatchWorker = + new SingleServerPostBatchWorker( + db, + builder.setData(batchData).setSerialNum(pos).build(), + 0); + + postBatchWorker.call(); + + pos++; + + } + + SingleServerCloseBatchWorker closeBatchWorker = new SingleServerCloseBatchWorker(db, completeBatch.getCloseBatchMessage(), 0); + + closeBatchWorker.call(); + + } + } catch (Exception e) { // Occurs only when server replies with valid status but invalid data + throw new CommunicationException("Error accessing database: " + e.getMessage()); + } + + digest.update(completeBatch); + return digest.digestAsMessageID(); + } + /** * Access each database and search for a given message ID * Return the number of databases in which the message was found diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java index 1eed944..d96864a 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java @@ -26,6 +26,27 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize private static final MessageFilterList EMPTY_FILTER = MessageFilterList.getDefaultInstance(); private static final int SLEEP_INTERVAL = 10000; // 10 Seconds + private class MessageDeleteCallback implements FutureCallback { + + private final long entryNum; + + public MessageDeleteCallback(long entryNum) { + this.entryNum = entryNum; + } + + @Override + public void onSuccess(Boolean result) { + // Success: delete from database + localClient.deleteMessage(entryNum, null); + } + + @Override + public void onFailure(Throwable t) { + // Ignore + } + + } + private class SyncCallback implements FutureCallback> { @Override @@ -41,27 +62,40 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize for (BulletinBoardMessage message : result){ - if (message.getMsg().getTagList().contains(BulletinBoardConstants.BATCH_TAG)){ + try { - // This is a batch message: need to upload batch data as well as the message itself - ByteString signerId = message.getSig(0).getSignerId(); - long batchID = Long.parseLong(BulletinBoardUtils.findTagWithPrefix(message, BulletinBoardConstants.BATCH_ID_TAG_PREFIX)); + if (message.getMsg().getTagList().contains(BulletinBoardConstants.BATCH_TAG)) { - BatchSpecificationMessage batchSpecificationMessage = BatchSpecificationMessage.newBuilder().build(); + // This is a batch message: need to upload batch data as well as the message itself + ByteString signerID = message.getSig(0).getSignerId(); + int batchID = Integer.parseInt(BulletinBoardUtils.findTagWithPrefix(message, BulletinBoardConstants.BATCH_ID_TAG_PREFIX)); - localClient.readBatch(batchSpecificationMessage, null); + BatchSpecificationMessage batchSpecificationMessage = BatchSpecificationMessage.newBuilder() + .setSignerId(signerID) + .setBatchId(batchID) + .setStartPosition(0) + .build(); - } - else{ - // This is a regular message: post it - try { + CompleteBatch completeBatch = localClient.readBatch(batchSpecificationMessage); + + remoteClient.postBatch(completeBatch); + + + } else { + + // This is a regular message: post it + remoteClient.postMessage(message); - } catch (CommunicationException e) { - newStatus = SyncStatus.SERVER_ERROR; + } + localClient.deleteMessage(message.getEntryNum()); + + } catch (CommunicationException e) { + updateSyncStatus(SyncStatus.SERVER_ERROR); } + } } @@ -144,7 +178,7 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize @Override public void run() { - if (syncStatus != SyncStatus.STOPPED) { + if (syncStatus == SyncStatus.STOPPED) { updateSyncStatus(SyncStatus.PENDING); SyncCallback callback = new SyncCallback(); diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java index 0a43562..74a8162 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java @@ -34,8 +34,6 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i private ListeningScheduledExecutorService executorService; - protected BatchDigest batchDigest; - private long lastServerErrorTime; private final long failDelayInMilliseconds; @@ -347,9 +345,6 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i // Perform usual setup super.init(clientParams); - // Wrap the Digest into a BatchDigest - batchDigest = new GenericBatchDigest(digest); - // Remove all but first DB address String dbAddress = meerkatDBs.get(0); meerkatDBs = new LinkedList<>(); @@ -367,9 +362,9 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i scheduleWorker(worker, new RetryCallback<>(worker, callback)); // Calculate the correct message ID and return it - batchDigest.reset(); - batchDigest.update(msg.getMsg()); - return batchDigest.digestAsMessageID(); + digest.reset(); + digest.update(msg.getMsg()); + return digest.digestAsMessageID(); } @@ -435,9 +430,9 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i new BeginBatchCallback(completeBatch, callback) ); - batchDigest.update(completeBatch); + digest.update(completeBatch); - return batchDigest.digestAsMessageID(); + return digest.digestAsMessageID(); } diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java index 91e577d..4fb526a 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java @@ -27,6 +27,14 @@ public interface BulletinBoardClient { */ MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException; + /** + * Perform an end-to-end post of a signed batch message + * @param completeBatch contains all the data of the batch including the meta-data and the signature + * @return a unique identifier for the batch message + * @throws CommunicationException + */ + public MessageID postBatch(CompleteBatch completeBatch) throws CommunicationException; + /** * Check how "safe" a given message is in a synchronous manner * @param id is the unique message identifier for retrieval diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardMessageDeleter.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardMessageDeleter.java index 6025719..cf57975 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardMessageDeleter.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardMessageDeleter.java @@ -13,18 +13,37 @@ import meerkat.protobuf.BulletinBoardAPI.*; public interface BulletinBoardMessageDeleter { /** - * Deletes a message from a Bulletin Board Server + * Deletes a message from a Bulletin Board Server in a possibly asynchronous manner + * Logs this action * @param msgID is the ID of the message to delete * @param callback handles the result of the operation */ public void deleteMessage(MessageID msgID, FutureCallback callback); /** - * Deletes a message from the Bulletin Board + * Deletes a message from the Bulletin Board in a possibly asynchronous manner * Logs this action * @param entryNum is the serial entry number of the message to delete * @param callback handles the result of the operation */ public void deleteMessage(long entryNum, FutureCallback callback); + /** + * Deletes a message from a Bulletin Board Server in a synchronous manner + * Logs this action + * @param msgID is the ID of the message to delete + * @return TRUE if the message was deleted and FALSE if it did not exist on the server + * @throws CommunicationException when an error occurs + */ + public boolean deleteMessage(MessageID msgID) throws CommunicationException; + + /** + * Deletes a message from the Bulletin Board in a synchronous manner + * Logs this action + * @param entryNum is the serial entry number of the message to delete + * @return TRUE if the message was deleted and FALSE if it did not exist on the server + * @throws CommunicationException when an error occurs + */ + public boolean deleteMessage(long entryNum) throws CommunicationException; + }