diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/BatchDataContainer.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/BatchDataContainer.java new file mode 100644 index 0000000..53f4870 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/BatchDataContainer.java @@ -0,0 +1,25 @@ +package meerkat.bulletinboard; + +import com.google.protobuf.ByteString; +import meerkat.protobuf.BulletinBoardAPI.BatchData; + +import java.util.List; + +/** + * Created by Arbel Deutsch Peled on 17-Jan-16. + * Used to store the complete data required for sending a batch data list inside a single object + */ +public class BatchDataContainer { + + public final byte[] signerId; + public final int batchId; + public final List batchDataList; + public final int startPosition; + + public BatchDataContainer(byte[] signerId, int batchId, List batchDataList, int startPosition) { + this.signerId = signerId; + this.batchId = batchId; + this.batchDataList = batchDataList; + this.startPosition = startPosition; + } +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java index e92f35b..9c211fd 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java @@ -5,9 +5,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; -import meerkat.bulletinboard.workers.SingleServerGetRedundancyWorker; -import meerkat.bulletinboard.workers.SingleServerPostMessageWorker; -import meerkat.bulletinboard.workers.SingleServerReadMessagesWorker; +import meerkat.bulletinboard.workers.singleserver.*; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Voting.BulletinBoardClientParams; @@ -30,6 +28,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i protected ListeningScheduledExecutorService executorService; + protected BatchDigest batchDigest; + private long lastServerErrorTime; protected final long failDelayInMilliseconds; @@ -136,6 +136,9 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i // Perform usual setup super.init(clientParams); + // Wrap the Digest into a BatchDigest + batchDigest = new GenericBatchDigest(digest); + // Remove all but first DB address String dbAddress = meerkatDBs.get(0); meerkatDBs = new LinkedList(); @@ -153,19 +156,124 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i scheduleWorker(worker, new RetryCallback(worker, callback)); // Calculate the correct message ID and return it - digest.reset(); - digest.update(msg.getMsg()); - return MessageID.newBuilder().setID(ByteString.copyFrom(digest.digest())).build(); + batchDigest.reset(); + batchDigest.update(msg.getMsg()); + return batchDigest.digestAsMessageID(); } + private class PostBatchDataCallback implements ClientCallback { + + private CompleteBatch completeBatch; + ClientCallback callback; + + public PostBatchDataCallback(CompleteBatch completeBatch, ClientCallback callback) { + this.completeBatch = completeBatch; + this.callback = callback; + } + + @Override + public void handleCallback(Boolean msg) { + closeBatch( + CloseBatchMessage.newBuilder() + .setBatchId(completeBatch.getBeginBatchMessage().getBatchId()) + .setSig(completeBatch.getSignature()) + .setBatchLength(completeBatch.getBatchDataList().size()) + .build(), + callback + ); + } + + @Override + public void handleFailure(Throwable t) { + callback.handleFailure(t); + } + + } + + private class BeginBatchCallback implements ClientCallback { + + private CompleteBatch completeBatch; + ClientCallback callback; + + public BeginBatchCallback(CompleteBatch completeBatch, ClientCallback callback) { + this.completeBatch = completeBatch; + this.callback = callback; + } + + @Override + public void handleCallback(Boolean msg) { + + postBatchData( + completeBatch.getBeginBatchMessage().getSignerId(), + completeBatch.getBeginBatchMessage().getBatchId(), + completeBatch.getBatchDataList(), + 0, + new PostBatchDataCallback(completeBatch,callback)); + } + + @Override + public void handleFailure(Throwable t) { + callback.handleFailure(t); + } + } + @Override public MessageID postBatch(CompleteBatch completeBatch, ClientCallback callback) { - return null; + + beginBatch( + completeBatch.getBeginBatchMessage(), + new BeginBatchCallback(completeBatch, callback) + ); + + batchDigest.update(completeBatch); + + return batchDigest.digestAsMessageID(); + } @Override - public void beginBatch(byte[] signerId, int batchId, List tagList, ClientCallback callback) { + public void beginBatch(BeginBatchMessage beginBatchMessage, ClientCallback callback) { + + // Create worker with redundancy 1 and MAX_RETRIES retries + SingleServerBeginBatchWorker worker = + new SingleServerBeginBatchWorker(meerkatDBs.get(0), beginBatchMessage, MAX_RETRIES); + + // Submit worker and create callback + scheduleWorker(worker, new RetryCallback(worker, callback)); + + } + + @Override + public void postBatchData(ByteString signerId, int batchId, List batchDataList, + int startPosition, ClientCallback callback) { + + BatchMessage.Builder builder = BatchMessage.newBuilder() + .setSignerId(signerId) + .setBatchId(batchId); + + // Iterate through data list + + for (BatchData data : batchDataList) { + builder.setSerialNum(startPosition).setData(data); + + // Create worker with redundancy 1 and MAX_RETRIES retries + SingleServerPostBatchWorker worker = + new SingleServerPostBatchWorker(meerkatDBs.get(0), builder.build(), MAX_RETRIES); + + // Create worker with redundancy 1 and MAX_RETRIES retries + scheduleWorker(worker, new RetryCallback(worker, callback)); + + // Increment position in batch + startPosition++; + } + + } + + @Override + public void postBatchData(ByteString signerId, int batchId, List batchDataList, ClientCallback callback) { + + postBatchData(signerId, batchId, batchDataList, 0, callback); } @@ -173,15 +281,26 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i public void postBatchData(byte[] signerId, int batchId, List batchDataList, int startPosition, ClientCallback callback) { + postBatchData(ByteString.copyFrom(signerId), batchId, batchDataList, startPosition, callback); + } @Override public void postBatchData(byte[] signerId, int batchId, List batchDataList, ClientCallback callback) { + postBatchData(signerId, batchId, batchDataList, 0, callback); + } @Override - public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback callback) { + public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback callback) { + + // Create worker with redundancy 1 and MAX_RETRIES retries + SingleServerCloseBatchWorker worker = + new SingleServerCloseBatchWorker(meerkatDBs.get(0), closeBatchMessage, MAX_RETRIES); + + // Submit worker and create callback + scheduleWorker(worker, new RetryCallback(worker, callback)); } @@ -208,7 +327,13 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } @Override - public void readBatch(byte[] signerId, int batchId, ClientCallback callback) { + public void readBatch(BatchSpecificationMessage batchSpecificationMessage, ClientCallback callback) { + + // Create job with no retries + SingleServerReadBatchWorker worker = new SingleServerReadBatchWorker(meerkatDBs.get(0), batchSpecificationMessage, 1); + + // Submit job and create callback + scheduleWorker(worker, new RetryCallback(worker, callback)); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java index ce7a01e..78d5dba 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java @@ -2,9 +2,7 @@ package meerkat.bulletinboard; import com.google.protobuf.ByteString; -import meerkat.bulletinboard.workers.MultiServerGetRedundancyWorker; -import meerkat.bulletinboard.workers.MultiServerPostMessageWorker; -import meerkat.bulletinboard.workers.MultiServerReadMessagesWorker; +import meerkat.bulletinboard.workers.multiserver.*; import meerkat.comm.CommunicationException; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Voting.*; @@ -31,6 +29,8 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple // Per-server clients List clients; + BatchDigest batchDigest; + private final static int POST_MESSAGE_RETRY_NUM = 3; private final static int READ_MESSAGES_RETRY_NUM = 1; private final static int GET_REDUNDANCY_RETRY_NUM = 1; @@ -51,6 +51,8 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple super.init(clientParams); + batchDigest = new GenericBatchDigest(digest); + minAbsoluteRedundancy = (int) (clientParams.getMinRedundancy() * clientParams.getBulletinBoardAddressCount()); executorService = Executors.newFixedThreadPool(JOBS_THREAD_NUM); @@ -84,42 +86,88 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple executorService.submit(worker); // Calculate the correct message ID and return it - digest.reset(); - digest.update(msg.getMsg()); - return MessageID.newBuilder().setID(ByteString.copyFrom(digest.digest())).build(); + batchDigest.reset(); + batchDigest.update(msg.getMsg()); + return batchDigest.digestAsMessageID(); } @Override public MessageID postBatch(CompleteBatch completeBatch, ClientCallback callback) { - return null; // TODO: write this + // Create job + MultiServerPostBatchWorker worker = + new MultiServerPostBatchWorker(clients, minAbsoluteRedundancy, completeBatch, POST_MESSAGE_RETRY_NUM, callback); + + // Submit job + executorService.submit(worker); + + // Calculate the correct message ID and return it + batchDigest.reset(); + batchDigest.update(completeBatch); + return batchDigest.digestAsMessageID(); } @Override - public void beginBatch(byte[] signerId, int batchId, List tagList, ClientCallback callback) { - // TODO: write this + public void beginBatch(BeginBatchMessage beginBatchMessage, ClientCallback callback) { + + // Create job + MultiServerBeginBatchWorker worker = + new MultiServerBeginBatchWorker(clients, minAbsoluteRedundancy, beginBatchMessage, POST_MESSAGE_RETRY_NUM, callback); + + // Submit job + executorService.submit(worker); + } @Override public void postBatchData(byte[] signerId, int batchId, List batchDataList, - int startPosition, ClientCallback callback) { + int startPosition, ClientCallback callback) { - // TODO: write this + BatchDataContainer batchDataContainer = new BatchDataContainer(signerId, batchId, batchDataList, startPosition); + + // Create job + MultiServerPostBatchDataWorker worker = + new MultiServerPostBatchDataWorker(clients, minAbsoluteRedundancy, batchDataContainer, POST_MESSAGE_RETRY_NUM, callback); + + // Submit job + executorService.submit(worker); } @Override public void postBatchData(byte[] signerId, int batchId, List batchDataList, ClientCallback callback) { - postBatchData(signerId, batchId, batchDataList, 0, callback); // Write batch from beginning + postBatchData(signerId, batchId, batchDataList, 0, callback); } @Override - public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback callback) { - // TODO: write this + public void postBatchData(ByteString signerId, int batchId, List batchDataList, + int startPosition, ClientCallback callback) { + + postBatchData(signerId.toByteArray(), batchId, batchDataList, startPosition, callback); + + } + + @Override + public void postBatchData(ByteString signerId, int batchId, List batchDataList, ClientCallback callback) { + + postBatchData(signerId, batchId, batchDataList, 0, callback); + + } + + @Override + public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback callback) { + + // Create job + MultiServerCloseBatchWorker worker = + new MultiServerCloseBatchWorker(clients, minAbsoluteRedundancy, closeBatchMessage, POST_MESSAGE_RETRY_NUM, callback); + + // Submit job + executorService.submit(worker); + } /** @@ -158,8 +206,15 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple } @Override - public void readBatch(byte[] signerId, int batchId, ClientCallback callback) { - // TODO: Implement + public void readBatch(BatchSpecificationMessage batchSpecificationMessage, ClientCallback callback) { + + // Create job + MultiServerReadBatchWorker worker = + new MultiServerReadBatchWorker(clients, minAbsoluteRedundancy, batchSpecificationMessage, READ_MESSAGES_RETRY_NUM, callback); + + // Submit job + executorService.submit(worker); + } @Override diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerPostBatchWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerPostBatchWorker.java deleted file mode 100644 index cfc34e7..0000000 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerPostBatchWorker.java +++ /dev/null @@ -1,7 +0,0 @@ -package meerkat.bulletinboard.workers; - -/** - * Created by Arbel Deutsch Peled on 27-Dec-15. - */ -public class MultiServerPostBatchWorker { -} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerBeginBatchWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerBeginBatchWorker.java new file mode 100644 index 0000000..dc496d7 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerBeginBatchWorker.java @@ -0,0 +1,28 @@ +package meerkat.bulletinboard.workers.multiserver; + +import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import meerkat.bulletinboard.SingleServerBulletinBoardClient; +import meerkat.protobuf.BulletinBoardAPI.BeginBatchMessage; + +import java.util.List; + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + */ +public class MultiServerBeginBatchWorker extends MultiServerGenericPostWorker { + + public MultiServerBeginBatchWorker(List clients, + int minServers, BeginBatchMessage payload, int maxRetry, + ClientCallback clientCallback) { + + super(clients, minServers, payload, maxRetry, clientCallback); + + } + + @Override + protected void doPost(SingleServerBulletinBoardClient client, BeginBatchMessage payload) { + client.beginBatch(payload, this); + } + + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerCloseBatchWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerCloseBatchWorker.java new file mode 100644 index 0000000..56b09c5 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerCloseBatchWorker.java @@ -0,0 +1,28 @@ +package meerkat.bulletinboard.workers.multiserver; + +import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import meerkat.bulletinboard.SingleServerBulletinBoardClient; +import meerkat.protobuf.BulletinBoardAPI.CloseBatchMessage; + +import java.util.List; + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + */ +public class MultiServerCloseBatchWorker extends MultiServerGenericPostWorker { + + public MultiServerCloseBatchWorker(List clients, + int minServers, CloseBatchMessage payload, int maxRetry, + ClientCallback clientCallback) { + + super(clients, minServers, payload, maxRetry, clientCallback); + + } + + @Override + protected void doPost(SingleServerBulletinBoardClient client, CloseBatchMessage payload) { + client.closeBatch(payload, this); + } + + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerPostMessageWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerGenericPostWorker.java similarity index 82% rename from bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerPostMessageWorker.java rename to bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerGenericPostWorker.java index dcdc82a..8b62d4e 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerPostMessageWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerGenericPostWorker.java @@ -1,10 +1,9 @@ -package meerkat.bulletinboard.workers; +package meerkat.bulletinboard.workers.multiserver; import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; import meerkat.bulletinboard.MultiServerWorker; import meerkat.bulletinboard.SingleServerBulletinBoardClient; import meerkat.comm.CommunicationException; -import meerkat.protobuf.BulletinBoardAPI.*; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response; @@ -14,16 +13,18 @@ import java.util.List; /** * Created by Arbel Deutsch Peled on 27-Dec-15. */ -public class MultiServerPostMessageWorker extends MultiServerWorker { +public abstract class MultiServerGenericPostWorker extends MultiServerWorker { - public MultiServerPostMessageWorker(List clients, - int minServers, BulletinBoardMessage payload, int maxRetry, + public MultiServerGenericPostWorker(List clients, + int minServers, T payload, int maxRetry, ClientCallback clientCallback) { super(clients, minServers, payload, maxRetry, clientCallback); } + protected abstract void doPost(SingleServerBulletinBoardClient client, T payload); + /** * This method carries out the actual communication with the servers via HTTP Post * It accesses the servers one by one and tries to post the payload to each in turn @@ -48,7 +49,7 @@ public class MultiServerPostMessageWorker extends MultiServerWorker>{ +public abstract class MultiServerGenericReadWorker extends MultiServerWorker{ - private Iterator clientIterator; + protected Iterator clientIterator; - public MultiServerReadMessagesWorker(List clients, - int minServers, MessageFilterList payload, int maxRetry, - ClientCallback> clientCallback) { + public MultiServerGenericReadWorker(List clients, + int minServers, IN payload, int maxRetry, + ClientCallback clientCallback) { super(clients, true, minServers, payload, maxRetry, clientCallback); // Shuffle clients on creation to balance load @@ -27,6 +26,8 @@ public class MultiServerReadMessagesWorker extends MultiServerWorker msg) { + public void handleCallback(OUT msg) { succeed(msg); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerGetRedundancyWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerGetRedundancyWorker.java similarity index 97% rename from bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerGetRedundancyWorker.java rename to bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerGetRedundancyWorker.java index 4d5e7f2..5675cb8 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerGetRedundancyWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerGetRedundancyWorker.java @@ -1,4 +1,4 @@ -package meerkat.bulletinboard.workers; +package meerkat.bulletinboard.workers.multiserver; import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; import meerkat.bulletinboard.MultiServerWorker; diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerPostBatchDataWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerPostBatchDataWorker.java new file mode 100644 index 0000000..df21f9f --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerPostBatchDataWorker.java @@ -0,0 +1,28 @@ +package meerkat.bulletinboard.workers.multiserver; + +import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import meerkat.bulletinboard.SingleServerBulletinBoardClient; +import meerkat.bulletinboard.BatchDataContainer; + +import java.util.List; + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + */ +public class MultiServerPostBatchDataWorker extends MultiServerGenericPostWorker { + + public MultiServerPostBatchDataWorker(List clients, + int minServers, BatchDataContainer payload, int maxRetry, + ClientCallback clientCallback) { + + super(clients, minServers, payload, maxRetry, clientCallback); + + } + + @Override + protected void doPost(SingleServerBulletinBoardClient client, BatchDataContainer payload) { + client.postBatchData(payload.signerId, payload.batchId, payload.batchDataList, payload.startPosition, this); + } + + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerPostBatchWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerPostBatchWorker.java new file mode 100644 index 0000000..7c2f586 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerPostBatchWorker.java @@ -0,0 +1,28 @@ +package meerkat.bulletinboard.workers.multiserver; + +import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import meerkat.bulletinboard.CompleteBatch; +import meerkat.bulletinboard.SingleServerBulletinBoardClient; + +import java.util.List; + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + */ +public class MultiServerPostBatchWorker extends MultiServerGenericPostWorker { + + public MultiServerPostBatchWorker(List clients, + int minServers, CompleteBatch payload, int maxRetry, + ClientCallback clientCallback) { + + super(clients, minServers, payload, maxRetry, clientCallback); + + } + + @Override + protected void doPost(SingleServerBulletinBoardClient client, CompleteBatch payload) { + client.postBatch(payload, this); + } + + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerPostMessageWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerPostMessageWorker.java new file mode 100644 index 0000000..33f9a3c --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerPostMessageWorker.java @@ -0,0 +1,28 @@ +package meerkat.bulletinboard.workers.multiserver; + +import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import meerkat.bulletinboard.SingleServerBulletinBoardClient; +import meerkat.protobuf.BulletinBoardAPI.*; + +import java.util.List; + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + */ +public class MultiServerPostMessageWorker extends MultiServerGenericPostWorker { + + public MultiServerPostMessageWorker(List clients, + int minServers, BulletinBoardMessage payload, int maxRetry, + ClientCallback clientCallback) { + + super(clients, minServers, payload, maxRetry, clientCallback); + + } + + @Override + protected void doPost(SingleServerBulletinBoardClient client, BulletinBoardMessage payload) { + client.postMessage(payload, this); + } + + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadBatchWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadBatchWorker.java new file mode 100644 index 0000000..737c15c --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadBatchWorker.java @@ -0,0 +1,30 @@ +package meerkat.bulletinboard.workers.multiserver; + +import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import meerkat.bulletinboard.CompleteBatch; +import meerkat.bulletinboard.SingleServerBulletinBoardClient; +import meerkat.protobuf.BulletinBoardAPI.BatchSpecificationMessage; + +import java.util.List; + + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + */ +public class MultiServerReadBatchWorker extends MultiServerGenericReadWorker { + + public MultiServerReadBatchWorker(List clients, + int minServers, BatchSpecificationMessage payload, int maxRetry, + ClientCallback clientCallback) { + + super(clients, minServers, payload, maxRetry, clientCallback); + + } + + @Override + protected void doRead(BatchSpecificationMessage payload, SingleServerBulletinBoardClient client) { + client.readBatch(payload, this); + } + + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadMessagesWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadMessagesWorker.java new file mode 100644 index 0000000..b276eab --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadMessagesWorker.java @@ -0,0 +1,29 @@ +package meerkat.bulletinboard.workers.multiserver; + +import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import meerkat.bulletinboard.SingleServerBulletinBoardClient; +import meerkat.protobuf.BulletinBoardAPI.*; + +import java.util.List; + + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + */ +public class MultiServerReadMessagesWorker extends MultiServerGenericReadWorker>{ + + public MultiServerReadMessagesWorker(List clients, + int minServers, MessageFilterList payload, int maxRetry, + ClientCallback> clientCallback) { + + super(clients, minServers, payload, maxRetry, clientCallback); + + } + + @Override + protected void doRead(MessageFilterList payload, SingleServerBulletinBoardClient client) { + client.readMessages(payload, this); + } + + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerBeginBatchWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerBeginBatchWorker.java new file mode 100644 index 0000000..0c1a1a3 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerBeginBatchWorker.java @@ -0,0 +1,17 @@ +package meerkat.bulletinboard.workers.singleserver; + +import meerkat.protobuf.BulletinBoardAPI.BeginBatchMessage; + +import static meerkat.bulletinboard.BulletinBoardConstants.BEGIN_BATCH_PATH; + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + * Tries to contact server once and perform a post operation + */ +public class SingleServerBeginBatchWorker extends SingleServerGenericPostWorker { + + public SingleServerBeginBatchWorker(String serverAddress, BeginBatchMessage payload, int maxRetry) { + super(serverAddress, BEGIN_BATCH_PATH, payload, maxRetry); + } + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerCloseBatchWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerCloseBatchWorker.java new file mode 100644 index 0000000..ab298a5 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerCloseBatchWorker.java @@ -0,0 +1,17 @@ +package meerkat.bulletinboard.workers.singleserver; + +import meerkat.protobuf.BulletinBoardAPI.CloseBatchMessage; + +import static meerkat.bulletinboard.BulletinBoardConstants.CLOSE_BATCH_PATH; + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + * Tries to contact server once and perform a close batch operation + */ +public class SingleServerCloseBatchWorker extends SingleServerGenericPostWorker { + + public SingleServerCloseBatchWorker(String serverAddress, CloseBatchMessage payload, int maxRetry) { + super(serverAddress, CLOSE_BATCH_PATH, payload, maxRetry); + } + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/SingleServerPostMessageWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGenericPostWorker.java similarity index 76% rename from bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/SingleServerPostMessageWorker.java rename to bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGenericPostWorker.java index ee9fd3f..c56af05 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/SingleServerPostMessageWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGenericPostWorker.java @@ -1,9 +1,8 @@ -package meerkat.bulletinboard.workers; +package meerkat.bulletinboard.workers.singleserver; import meerkat.bulletinboard.SingleServerWorker; import meerkat.comm.CommunicationException; import meerkat.protobuf.BulletinBoardAPI.BoolMsg; -import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage; import meerkat.rest.Constants; import javax.ws.rs.ProcessingException; @@ -13,16 +12,18 @@ import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response; import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH; -import static meerkat.bulletinboard.BulletinBoardConstants.POST_MESSAGE_PATH; /** * Created by Arbel Deutsch Peled on 27-Dec-15. * Tries to contact server once and perform a post operation */ -public class SingleServerPostMessageWorker extends SingleServerWorker { +public class SingleServerGenericPostWorker extends SingleServerWorker { - public SingleServerPostMessageWorker(String serverAddress, BulletinBoardMessage payload, int maxRetry) { + private String subPath; + + public SingleServerGenericPostWorker(String serverAddress, String subPath, T payload, int maxRetry) { super(serverAddress, payload, maxRetry); + this.subPath = subPath; } /** @@ -30,13 +31,13 @@ public class SingleServerPostMessageWorker extends SingleServerWorker { + + public SingleServerPostBatchWorker(String serverAddress, BatchMessage payload, int maxRetry) { + super(serverAddress, POST_BATCH_PATH, payload, maxRetry); + } + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerPostMessageWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerPostMessageWorker.java new file mode 100644 index 0000000..454d720 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerPostMessageWorker.java @@ -0,0 +1,17 @@ +package meerkat.bulletinboard.workers.singleserver; + +import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage; + +import static meerkat.bulletinboard.BulletinBoardConstants.POST_MESSAGE_PATH; + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + * Tries to contact server once and perform a post operation + */ +public class SingleServerPostMessageWorker extends SingleServerGenericPostWorker { + + public SingleServerPostMessageWorker(String serverAddress, BulletinBoardMessage payload, int maxRetry) { + super(serverAddress, POST_MESSAGE_PATH, payload, maxRetry); + } + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadBatchWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadBatchWorker.java new file mode 100644 index 0000000..61556fc --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadBatchWorker.java @@ -0,0 +1,119 @@ +package meerkat.bulletinboard.workers.singleserver; + +import meerkat.bulletinboard.CompleteBatch; +import meerkat.bulletinboard.SingleServerWorker; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.rest.Constants; + +import javax.ws.rs.ProcessingException; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.Response; +import java.util.List; + +import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH; +import static meerkat.bulletinboard.BulletinBoardConstants.READ_MESSAGES_PATH; +import static meerkat.bulletinboard.BulletinBoardConstants.READ_BATCH_PATH; + +import static meerkat.bulletinboard.BulletinBoardConstants.BATCH_ID_TAG_PREFIX; + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + */ +public class SingleServerReadBatchWorker extends SingleServerWorker { + + public SingleServerReadBatchWorker(String serverAddress, BatchSpecificationMessage payload, int maxRetry) { + super(serverAddress, payload, maxRetry); + } + + /** + * This method carries out the actual communication with the server via HTTP Post + * Upon successful retrieval from the server the method returns the received values + * @return the complete batch as read from the server + * @throws CommunicationException if the server's response is invalid + */ + public CompleteBatch call() throws CommunicationException{ + + CompleteBatch completeBatch = new CompleteBatch(); + + Client client = clientLocal.get(); + + WebTarget webTarget; + Response response; + + // Set filters for the batch message metadata retrieval + + MessageFilterList messageFilterList = MessageFilterList.newBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.TAG) + .setTag(BATCH_ID_TAG_PREFIX + String.valueOf(payload.getBatchId())) + .build()) + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.SIGNER_ID) + .setId(payload.getSignerId()) + .build()) + .build(); + + // Send request to Server + + webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(READ_MESSAGES_PATH); + response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post( + Entity.entity(messageFilterList, Constants.MEDIATYPE_PROTOBUF)); + + // Retrieve answer + + try { + + // If a BulletinBoardMessageList is returned: the read was successful + BulletinBoardMessage metadata = response.readEntity(BulletinBoardMessageList.class).getMessage(0); + + completeBatch.setBeginBatchMessage(BeginBatchMessage.newBuilder() + .setSignerId(payload.getSignerId()) + .setBatchId(payload.getBatchId()) + .addAllTag(metadata.getMsg().getTagList()) + .build()); + + completeBatch.setSignature(metadata.getSig(0)); + + } catch (ProcessingException | IllegalStateException e) { + + // Read failed + throw new CommunicationException("Could not contact the server"); + + } + finally { + response.close(); + } + + // Get the batch data + + webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(READ_BATCH_PATH); + response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post( + Entity.entity(payload, Constants.MEDIATYPE_PROTOBUF)); + + // Retrieve answer + + try { + + // If a List of BatchData is returned: the read was successful + + completeBatch.appendBatchData(response.readEntity(new GenericType>(){})); + + } catch (ProcessingException | IllegalStateException e) { + + // Read failed + throw new CommunicationException("Could not contact the server"); + + } + finally { + response.close(); + } + + return completeBatch; + + } + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/SingleServerReadMessagesWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadMessagesWorker.java similarity index 97% rename from bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/SingleServerReadMessagesWorker.java rename to bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadMessagesWorker.java index f8975cb..6c09bcc 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/SingleServerReadMessagesWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadMessagesWorker.java @@ -1,4 +1,4 @@ -package meerkat.bulletinboard.workers; +package meerkat.bulletinboard.workers.singleserver; import meerkat.bulletinboard.SingleServerWorker; import meerkat.comm.CommunicationException; diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java index cab9d6d..fe3d2fc 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java @@ -141,6 +141,10 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL } } + @Path(READ_BATCH_PATH) + @POST + @Consumes(MEDIATYPE_PROTOBUF) + @Produces(MEDIATYPE_PROTOBUF) @Override public List readBatch(BatchSpecificationMessage message) { try { diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java index 00bd2ac..7732fcb 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java @@ -1,7 +1,7 @@ package meerkat.bulletinboard; +import com.google.protobuf.ByteString; import meerkat.protobuf.BulletinBoardAPI.*; -import meerkat.protobuf.Crypto.Signature; import java.util.List; @@ -37,36 +37,47 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient { /** * This message informs the server about the existence of a new batch message and supplies it with the tags associated with it - * @param signerId is the canonical form for the ID of the sender of this batch - * @param batchId is a unique (per signer) ID for this batch - * @param tagList is a list of tags that belong to the batch message + * @param beginBatchMessage contains the data required to begin the batch + * @param callback is a callback function class for handling results of the operation */ - public void beginBatch(byte[] signerId, int batchId, List tagList, ClientCallback callback); + public void beginBatch(BeginBatchMessage beginBatchMessage, ClientCallback callback); /** * This method posts batch data into an (assumed to be open) batch * It does not close the batch * @param signerId is the canonical form for the ID of the sender of this batch * @param batchId is a unique (per signer) ID for this batch - * @param batchDataList is the (canonically ordered) list of data comprising the entire batch message (not just the portion to be written) + * @param batchDataList is the (canonically ordered) list of data comprising the portion of the batch to be posted * @param startPosition is the location (in the batch) of the first entry in batchDataList * (optionally used to continue interrupted post operations) + * The first position in the batch is position 0 * @param callback is a callback function class for handling results of the operation */ public void postBatchData(byte[] signerId, int batchId, List batchDataList, - int startPosition, ClientCallback callback); + int startPosition, ClientCallback callback); /** - * Overloading of the postBatchData method in which startPosition is set to the default value 0 + * Overloading of the postBatchData method which starts at the first position in the batch */ public void postBatchData(byte[] signerId, int batchId, List batchDataList, ClientCallback callback); + /** + * Overloading of the postBatchData method which uses ByteString + */ + public void postBatchData(ByteString signerId, int batchId, List batchDataList, + int startPosition, ClientCallback callback); + + /** + * Overloading of the postBatchData method which uses ByteString and starts at the first position in the batch + */ + public void postBatchData(ByteString signerId, int batchId, List batchDataList, ClientCallback callback); + /** * Attempts to close a batch message * @param closeBatchMessage contains the data required to close the batch * @param callback is a callback function class for handling results of the operation */ - public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback callback); + public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback callback); /** * Check how "safe" a given message is in an asynchronous manner @@ -88,11 +99,10 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient { /** * Read a given batch message from the bulletin board - * @param signerId is the ID of the signer (sender) of the batch message - * @param batchId is the unique (per signer) ID of the batch + * @param batchSpecificationMessage contains the data required to specify a single batch instance * @param callback is a callback class for handling the result of the operation */ - public void readBatch(byte[] signerId, int batchId, ClientCallback callback); + public void readBatch(BatchSpecificationMessage batchSpecificationMessage, ClientCallback callback); /** * Subscribes to a notifier that will return any new messages on the server that match the given filters diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardConstants.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardConstants.java index 0db1d3f..6bfc06f 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardConstants.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardConstants.java @@ -9,6 +9,7 @@ public interface BulletinBoardConstants { public static final String BULLETIN_BOARD_SERVER_PATH = "/bbserver"; public static final String READ_MESSAGES_PATH = "/readmessages"; + public static final String READ_BATCH_PATH = "/readbatch"; public static final String POST_MESSAGE_PATH = "/postmessage"; public static final String BEGIN_BATCH_PATH = "/beginbatch"; public static final String POST_BATCH_PATH = "/postbatch"; @@ -17,6 +18,6 @@ public interface BulletinBoardConstants { // Other Constants public static final String BATCH_TAG = "@BATCH"; - public static final String BATCH_ID_TAG_PREFIX = "#"; + public static final String BATCH_ID_TAG_PREFIX = "BATCHID#"; } diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigest.java b/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigest.java index 8171271..4f25f59 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigest.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigest.java @@ -2,7 +2,7 @@ package meerkat.bulletinboard; import com.google.protobuf.Message; import meerkat.crypto.Digest; -import meerkat.protobuf.BulletinBoardAPI.BeginBatchMessage; +import meerkat.protobuf.BulletinBoardAPI.MessageID; import meerkat.protobuf.BulletinBoardAPI.BatchData; import java.util.List; @@ -36,6 +36,11 @@ public class GenericBatchDigest implements BatchDigest{ return digest.digest(); } + @Override + public MessageID digestAsMessageID() { + return digest.digestAsMessageID(); + } + @Override public void update(Message msg) { digest.update(msg); diff --git a/meerkat-common/src/main/java/meerkat/crypto/Digest.java b/meerkat-common/src/main/java/meerkat/crypto/Digest.java index c72206e..b7d86dc 100644 --- a/meerkat-common/src/main/java/meerkat/crypto/Digest.java +++ b/meerkat-common/src/main/java/meerkat/crypto/Digest.java @@ -1,6 +1,7 @@ package meerkat.crypto; import com.google.protobuf.Message; +import meerkat.protobuf.BulletinBoardAPI.MessageID; import java.security.MessageDigest; @@ -15,6 +16,12 @@ public interface Digest { */ public byte[] digest(); + /** + * Completes the hash computation and returns a MessageID Protobuf as output + * @return + */ + public MessageID digestAsMessageID(); + /** * Updates the digest using the specified message (in serialized wire form) * diff --git a/meerkat-common/src/main/java/meerkat/crypto/concrete/SHA256Digest.java b/meerkat-common/src/main/java/meerkat/crypto/concrete/SHA256Digest.java index 4aac501..a7723ec 100644 --- a/meerkat-common/src/main/java/meerkat/crypto/concrete/SHA256Digest.java +++ b/meerkat-common/src/main/java/meerkat/crypto/concrete/SHA256Digest.java @@ -3,6 +3,7 @@ package meerkat.crypto.concrete; import com.google.protobuf.ByteString; import com.google.protobuf.Message; import meerkat.crypto.Digest; +import meerkat.protobuf.BulletinBoardAPI.MessageID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +61,11 @@ public class SHA256Digest implements Digest { return hash.digest(); } + @Override + public MessageID digestAsMessageID() { + return MessageID.newBuilder().setID(ByteString.copyFrom(digest())).build(); + } + @Override public void update(Message msg) {