diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/MultiServerWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/MultiServerWorker.java index 8db836f..727a922 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/MultiServerWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/MultiServerWorker.java @@ -2,7 +2,7 @@ package meerkat.bulletinboard; import com.google.common.util.concurrent.FutureCallback; -import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import com.google.common.util.concurrent.FutureCallback; import java.util.Collections; import java.util.Iterator; @@ -16,7 +16,7 @@ import java.util.concurrent.atomic.AtomicInteger; * This is a general class for handling multi-server work * It utilizes Single Server Clients to perform the actual per-server work */ -public abstract class MultiServerWorker extends BulletinClientWorker implements Runnable, ClientCallback{ +public abstract class MultiServerWorker extends BulletinClientWorker implements Runnable, FutureCallback{ private List clients; @@ -26,7 +26,7 @@ public abstract class MultiServerWorker extends BulletinClientWorker clientCallback; + private FutureCallback futureCallback; /** * Constructor @@ -35,11 +35,11 @@ public abstract class MultiServerWorker extends BulletinClientWorker clients, boolean shuffleClients, int minServers, IN payload, int maxRetry, - ClientCallback clientCallback) { + FutureCallback futureCallback) { super(payload,maxRetry); @@ -50,7 +50,7 @@ public abstract class MultiServerWorker extends BulletinClientWorker extends BulletinClientWorker clients, int minServers, IN payload, int maxRetry, - ClientCallback clientCallback) { + FutureCallback futureCallback) { - this(clients, false, minServers, payload, maxRetry, clientCallback); + this(clients, false, minServers, payload, maxRetry, futureCallback); } @@ -74,7 +74,7 @@ public abstract class MultiServerWorker extends BulletinClientWorker extends BulletinClientWorker implements FutureCallback { private SingleServerWorker worker; - private ClientCallback clientCallback; + private FutureCallback futureCallback; - public RetryCallback(SingleServerWorker worker, ClientCallback clientCallback) { + public RetryCallback(SingleServerWorker worker, FutureCallback futureCallback) { this.worker = worker; - this.clientCallback = clientCallback; + this.futureCallback = futureCallback; } @Override public void onSuccess(T result) { - clientCallback.handleCallback(result); + futureCallback.onSuccess(result); } @Override @@ -107,19 +114,210 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i scheduleWorker(worker, this); } else { // No more retries: notify caller about failure - clientCallback.handleFailure(t); + futureCallback.onFailure(t); } } } + /** + * This callback ties together all the per-batch-data callbacks into a single callback + * It reports success back to the user only if all of the batch-data were successfully posted + * If any batch-data fails to post: this callback reports failure + */ + class PostBatchDataListCallback implements FutureCallback { - public SingleServerBulletinBoardClient(int threadPoolSize, long failDelayInMilliseconds) { + private FutureCallback callback; + private AtomicInteger batchDataRemaining; + private AtomicBoolean aggregatedResult; + + public PostBatchDataListCallback(int batchDataLength, FutureCallback callback) { + + this.callback = callback; + this.batchDataRemaining = new AtomicInteger(batchDataLength); + this.aggregatedResult = new AtomicBoolean(false); + + } + + @Override + public void onSuccess(Boolean result) { + + if (result){ + this.aggregatedResult.set(true); + } + + if (batchDataRemaining.decrementAndGet() == 0){ + callback.onSuccess(this.aggregatedResult.get()); + } + } + + @Override + public void onFailure(Throwable t) { + + // Notify caller about failure + callback.onFailure(t); + + } + } + + /** + * This callback ties together the different parts of a CompleteBatch as they arrive from the server + * It assembles a CompleteBatch from the parts and sends it to the user if all parts arrived + * If any part fails to arrive: it invokes the onFailure method + */ + class CompleteBatchReadCallback { + + private FutureCallback callback; + + private List batchDataList; + private BulletinBoardMessage batchMessage; + + private AtomicInteger remainingQueries; + private AtomicBoolean failed; + + public CompleteBatchReadCallback(FutureCallback callback) { + + this.callback = callback; + + remainingQueries = new AtomicInteger(2); + failed = new AtomicBoolean(false); + + } + + protected void combineAndReturn() { + + final String[] prefixes = { + BulletinBoardConstants.BATCH_ID_TAG_PREFIX, + BulletinBoardConstants.BATCH_TAG}; + + if (remainingQueries.decrementAndGet() == 0){ + + BeginBatchMessage beginBatchMessage = + BeginBatchMessage.newBuilder() + .setSignerId(batchMessage.getSig(0).getSignerId()) + .setBatchId(Integer.parseInt( + BulletinBoardUtils.findTagWithPrefix(batchMessage, BulletinBoardConstants.BATCH_ID_TAG_PREFIX))) + .addAllTag(BulletinBoardUtils.removePrefixTags(batchMessage, Arrays.asList(prefixes))) + .build(); + callback.onSuccess(new CompleteBatch(beginBatchMessage, batchDataList, batchMessage.getSig(0))); + } + + } + + protected void fail(Throwable t) { + if (failed.compareAndSet(false, true)) { + callback.onFailure(t); + } + } + + /** + * @return a FutureCallback for the Batch Data List that ties to this object + */ + public FutureCallback> asBatchDataListFutureCallback() { + return new FutureCallback>() { + + @Override + public void onSuccess(List result) { + batchDataList = result; + + combineAndReturn(); + } + + @Override + public void onFailure(Throwable t) { + fail(t); + } + + }; + } + + /** + * @return a FutureCallback for the Bulletin Board Message that ties to this object + */ + public FutureCallback> asBulletinBoardMessageListFutureCallback() { + return new FutureCallback>() { + + @Override + public void onSuccess(List result) { + if (result.size() < 1){ + onFailure(new IllegalArgumentException("Server returned empty message list")); + return; + } + + batchMessage = result.get(0); + + combineAndReturn(); + } + + @Override + public void onFailure(Throwable t) { + fail(t); + } + }; + } + + } + + + /** + * Inner class for handling returned values of subscription operations + * This class's methods also ensure continued operation of the subscription + */ + class SubscriptionCallback implements FutureCallback> { + + private SingleServerReadMessagesWorker worker; + private MessageHandler messageHandler; + + private MessageFilterList.Builder filterBuilder; + + public SubscriptionCallback(SingleServerReadMessagesWorker worker, MessageHandler messageHandler) { + this.worker = worker; + this.messageHandler = messageHandler; + filterBuilder = worker.getPayload().toBuilder(); + + } + + @Override + public void onSuccess(List result) { + + // Report new messages to user + messageHandler.handleNewMessages(result); + + // Remove last filter from list (MIN_ENTRY one) + filterBuilder.removeFilter(filterBuilder.getFilterCount() - 1); + + // Add updated MIN_ENTRY filter (entry number is successor of last received entry's number) + filterBuilder.addFilter(MessageFilter.newBuilder() + .setType(FilterType.MIN_ENTRY) + .setEntry(result.get(result.size() - 1).getEntryNum() + 1) + .build()); + + // Create new worker with updated task + worker = new SingleServerReadMessagesWorker(worker.serverAddress, filterBuilder.build(), 1); + + // Schedule the worker + scheduleWorker(worker, this); + + } + + @Override + public void onFailure(Throwable t) { + + // Notify client about failure + fail(); + + // Reschedule exact same task + scheduleWorker(worker, this); + } + } + + public SingleServerBulletinBoardClient(int threadPoolSize, long failDelayInMilliseconds, long subscriptionIntervalInMilliseconds) { executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadPoolSize)); this.failDelayInMilliseconds = failDelayInMilliseconds; + this.subscriptionIntervalInMilliseconds = subscriptionIntervalInMilliseconds; // Set server error time to a time sufficiently in the past to make new jobs go through lastServerErrorTime = System.currentTimeMillis() - failDelayInMilliseconds; @@ -147,7 +345,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } @Override - public MessageID postMessage(BulletinBoardMessage msg, ClientCallback callback) { + public MessageID postMessage(BulletinBoardMessage msg, FutureCallback callback) { // Create worker with redundancy 1 and MAX_RETRIES retries SingleServerPostMessageWorker worker = new SingleServerPostMessageWorker(meerkatDBs.get(0), msg, MAX_RETRIES); @@ -162,18 +360,18 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } - private class PostBatchDataCallback implements ClientCallback { + private class PostBatchDataCallback implements FutureCallback { private CompleteBatch completeBatch; - ClientCallback callback; + FutureCallback callback; - public PostBatchDataCallback(CompleteBatch completeBatch, ClientCallback callback) { + public PostBatchDataCallback(CompleteBatch completeBatch, FutureCallback callback) { this.completeBatch = completeBatch; this.callback = callback; } @Override - public void handleCallback(Boolean msg) { + public void onSuccess(Boolean msg) { closeBatch( CloseBatchMessage.newBuilder() .setBatchId(completeBatch.getBeginBatchMessage().getBatchId()) @@ -185,24 +383,24 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } @Override - public void handleFailure(Throwable t) { - callback.handleFailure(t); + public void onFailure(Throwable t) { + callback.onFailure(t); } } - private class BeginBatchCallback implements ClientCallback { + private class BeginBatchCallback implements FutureCallback { private CompleteBatch completeBatch; - ClientCallback callback; + FutureCallback callback; - public BeginBatchCallback(CompleteBatch completeBatch, ClientCallback callback) { + public BeginBatchCallback(CompleteBatch completeBatch, FutureCallback callback) { this.completeBatch = completeBatch; this.callback = callback; } @Override - public void handleCallback(Boolean msg) { + public void onSuccess(Boolean msg) { postBatchData( completeBatch.getBeginBatchMessage().getSignerId(), @@ -213,13 +411,13 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } @Override - public void handleFailure(Throwable t) { - callback.handleFailure(t); + public void onFailure(Throwable t) { + callback.onFailure(t); } } @Override - public MessageID postBatch(CompleteBatch completeBatch, ClientCallback callback) { + public MessageID postBatch(CompleteBatch completeBatch, FutureCallback callback) { beginBatch( completeBatch.getBeginBatchMessage(), @@ -233,7 +431,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } @Override - public void beginBatch(BeginBatchMessage beginBatchMessage, ClientCallback callback) { + public void beginBatch(BeginBatchMessage beginBatchMessage, FutureCallback callback) { // Create worker with redundancy 1 and MAX_RETRIES retries SingleServerBeginBatchWorker worker = @@ -246,12 +444,16 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i @Override public void postBatchData(ByteString signerId, int batchId, List batchDataList, - int startPosition, ClientCallback callback) { + int startPosition, FutureCallback callback) { BatchMessage.Builder builder = BatchMessage.newBuilder() .setSignerId(signerId) .setBatchId(batchId); + // Create a unified callback to aggregate successful posts + + PostBatchDataListCallback listCallback = new PostBatchDataListCallback(batchDataList.size(), callback); + // Iterate through data list for (BatchData data : batchDataList) { @@ -262,7 +464,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i new SingleServerPostBatchWorker(meerkatDBs.get(0), builder.build(), MAX_RETRIES); // Create worker with redundancy 1 and MAX_RETRIES retries - scheduleWorker(worker, new RetryCallback(worker, callback)); + scheduleWorker(worker, new RetryCallback(worker, listCallback)); // Increment position in batch startPosition++; @@ -271,7 +473,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } @Override - public void postBatchData(ByteString signerId, int batchId, List batchDataList, ClientCallback callback) { + public void postBatchData(ByteString signerId, int batchId, List batchDataList, FutureCallback callback) { postBatchData(signerId, batchId, batchDataList, 0, callback); @@ -279,21 +481,21 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i @Override public void postBatchData(byte[] signerId, int batchId, List batchDataList, - int startPosition, ClientCallback callback) { + int startPosition, FutureCallback callback) { postBatchData(ByteString.copyFrom(signerId), batchId, batchDataList, startPosition, callback); } @Override - public void postBatchData(byte[] signerId, int batchId, List batchDataList, ClientCallback callback) { + public void postBatchData(byte[] signerId, int batchId, List batchDataList, FutureCallback callback) { postBatchData(signerId, batchId, batchDataList, 0, callback); } @Override - public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback callback) { + public void closeBatch(CloseBatchMessage closeBatchMessage, FutureCallback callback) { // Create worker with redundancy 1 and MAX_RETRIES retries SingleServerCloseBatchWorker worker = @@ -305,7 +507,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } @Override - public void getRedundancy(MessageID id, ClientCallback callback) { + public void getRedundancy(MessageID id, FutureCallback callback) { // Create worker with no retries SingleServerGetRedundancyWorker worker = new SingleServerGetRedundancyWorker(meerkatDBs.get(0), id, 1); @@ -316,7 +518,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } @Override - public void readMessages(MessageFilterList filterList, ClientCallback> callback) { + public void readMessages(MessageFilterList filterList, FutureCallback> callback) { // Create job with no retries SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterList, 1); @@ -327,19 +529,65 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } @Override - public void readBatch(BatchSpecificationMessage batchSpecificationMessage, ClientCallback callback) { + public void readBatch(BatchSpecificationMessage batchSpecificationMessage, FutureCallback callback) { - // Create job with no retries - SingleServerReadBatchWorker worker = new SingleServerReadBatchWorker(meerkatDBs.get(0), batchSpecificationMessage, 1); + // Create job with no retries for retrieval of the Bulletin Board Message that defines the batch - // Submit job and create callback - scheduleWorker(worker, new RetryCallback(worker, callback)); + MessageFilterList filterList = MessageFilterList.newBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.TAG) + .setTag(BulletinBoardConstants.BATCH_TAG) + .build()) + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.TAG) + .setTag(BulletinBoardConstants.BATCH_ID_TAG_PREFIX + batchSpecificationMessage.getBatchId()) + .build()) + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.SIGNER_ID) + .setId(batchSpecificationMessage.getSignerId()) + .build()) + .build(); + + SingleServerReadMessagesWorker messageWorker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterList, 1); + + // Create job with no retries for retrieval of the Batch Data List + SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(meerkatDBs.get(0), batchSpecificationMessage, 1); + + // Create callback that will combine the two worker products + CompleteBatchReadCallback completeBatchReadCallback = new CompleteBatchReadCallback(callback); + + // Submit jobs with wrapped callbacks + scheduleWorker(messageWorker, new RetryCallback(messageWorker, completeBatchReadCallback.asBulletinBoardMessageListFutureCallback())); + scheduleWorker(batchWorker, new RetryCallback(batchWorker, completeBatchReadCallback.asBatchDataListFutureCallback())); } @Override public void subscribe(MessageFilterList filterList, MessageHandler messageHandler) { + // Remove all existing MIN_ENTRY filters and create new one that starts at 0 + + MessageFilterList.Builder filterListBuilder = filterList.toBuilder(); + + Iterator iterator = filterListBuilder.getFilterList().iterator(); + while (iterator.hasNext()) { + MessageFilter filter = iterator.next(); + + if (filter.getType() == FilterType.MIN_ENTRY){ + iterator.remove(); + } + } + filterListBuilder.addFilter(MessageFilter.newBuilder() + .setType(FilterType.MIN_ENTRY) + .setEntry(0) + .build()); + + // Create job with no retries + SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterListBuilder.build(), 1); + + // Submit job and create callback + scheduleWorker(worker, new SubscriptionCallback(worker, messageHandler)); + } @Override diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java index 78d5dba..76e6236 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java @@ -1,5 +1,6 @@ package meerkat.bulletinboard; +import com.google.common.util.concurrent.FutureCallback; import com.google.protobuf.ByteString; import meerkat.bulletinboard.workers.multiserver.*; @@ -37,6 +38,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple private static final int SERVER_THREADPOOL_SIZE = 5; private static final long FAIL_DELAY = 5000; + private static final long SUBSCRIPTION_INTERVAL = 10000; private int minAbsoluteRedundancy; @@ -59,11 +61,16 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple clients = new ArrayList(clientParams.getBulletinBoardAddressCount()); for (String address : clientParams.getBulletinBoardAddressList()){ - SingleServerBulletinBoardClient client = new SingleServerBulletinBoardClient(SERVER_THREADPOOL_SIZE, FAIL_DELAY); + + SingleServerBulletinBoardClient client = + new SingleServerBulletinBoardClient(SERVER_THREADPOOL_SIZE, FAIL_DELAY, SUBSCRIPTION_INTERVAL); + client.init(BulletinBoardClientParams.newBuilder() .addBulletinBoardAddress(address) .build()); + clients.add(client); + } } @@ -76,7 +83,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple * @throws CommunicationException */ @Override - public MessageID postMessage(BulletinBoardMessage msg, ClientCallback callback){ + public MessageID postMessage(BulletinBoardMessage msg, FutureCallback callback){ // Create job MultiServerPostMessageWorker worker = @@ -93,7 +100,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple } @Override - public MessageID postBatch(CompleteBatch completeBatch, ClientCallback callback) { + public MessageID postBatch(CompleteBatch completeBatch, FutureCallback callback) { // Create job MultiServerPostBatchWorker worker = @@ -110,7 +117,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple } @Override - public void beginBatch(BeginBatchMessage beginBatchMessage, ClientCallback callback) { + public void beginBatch(BeginBatchMessage beginBatchMessage, FutureCallback callback) { // Create job MultiServerBeginBatchWorker worker = @@ -123,7 +130,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple @Override public void postBatchData(byte[] signerId, int batchId, List batchDataList, - int startPosition, ClientCallback callback) { + int startPosition, FutureCallback callback) { BatchDataContainer batchDataContainer = new BatchDataContainer(signerId, batchId, batchDataList, startPosition); @@ -137,7 +144,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple } @Override - public void postBatchData(byte[] signerId, int batchId, List batchDataList, ClientCallback callback) { + public void postBatchData(byte[] signerId, int batchId, List batchDataList, FutureCallback callback) { postBatchData(signerId, batchId, batchDataList, 0, callback); @@ -145,21 +152,21 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple @Override public void postBatchData(ByteString signerId, int batchId, List batchDataList, - int startPosition, ClientCallback callback) { + int startPosition, FutureCallback callback) { postBatchData(signerId.toByteArray(), batchId, batchDataList, startPosition, callback); } @Override - public void postBatchData(ByteString signerId, int batchId, List batchDataList, ClientCallback callback) { + public void postBatchData(ByteString signerId, int batchId, List batchDataList, FutureCallback callback) { postBatchData(signerId, batchId, batchDataList, 0, callback); } @Override - public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback callback) { + public void closeBatch(CloseBatchMessage closeBatchMessage, FutureCallback callback) { // Create job MultiServerCloseBatchWorker worker = @@ -177,7 +184,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple * Ignore communication exceptions in specific databases */ @Override - public void getRedundancy(MessageID id, ClientCallback callback) { + public void getRedundancy(MessageID id, FutureCallback callback) { // Create job MultiServerGetRedundancyWorker worker = @@ -194,7 +201,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple * If no operation is successful: return null (NOT blank list) */ @Override - public void readMessages(MessageFilterList filterList, ClientCallback> callback) { + public void readMessages(MessageFilterList filterList, FutureCallback> callback) { // Create job MultiServerReadMessagesWorker worker = @@ -206,7 +213,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple } @Override - public void readBatch(BatchSpecificationMessage batchSpecificationMessage, ClientCallback callback) { + public void readBatch(BatchSpecificationMessage batchSpecificationMessage, FutureCallback callback) { // Create job MultiServerReadBatchWorker worker = @@ -227,6 +234,11 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple super.close(); try { + + for (SingleServerBulletinBoardClient client : clients){ + client.close(); + } + executorService.shutdown(); while (! executorService.isShutdown()) { executorService.awaitTermination(10, TimeUnit.SECONDS); diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerBeginBatchWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerBeginBatchWorker.java index dc496d7..e0e92bb 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerBeginBatchWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerBeginBatchWorker.java @@ -1,6 +1,6 @@ package meerkat.bulletinboard.workers.multiserver; -import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import com.google.common.util.concurrent.FutureCallback; import meerkat.bulletinboard.SingleServerBulletinBoardClient; import meerkat.protobuf.BulletinBoardAPI.BeginBatchMessage; @@ -13,9 +13,9 @@ public class MultiServerBeginBatchWorker extends MultiServerGenericPostWorker clients, int minServers, BeginBatchMessage payload, int maxRetry, - ClientCallback clientCallback) { + FutureCallback futureCallback) { - super(clients, minServers, payload, maxRetry, clientCallback); + super(clients, minServers, payload, maxRetry, futureCallback); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerCloseBatchWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerCloseBatchWorker.java index 56b09c5..300440f 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerCloseBatchWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerCloseBatchWorker.java @@ -1,6 +1,6 @@ package meerkat.bulletinboard.workers.multiserver; -import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import com.google.common.util.concurrent.FutureCallback; import meerkat.bulletinboard.SingleServerBulletinBoardClient; import meerkat.protobuf.BulletinBoardAPI.CloseBatchMessage; @@ -13,9 +13,9 @@ public class MultiServerCloseBatchWorker extends MultiServerGenericPostWorker clients, int minServers, CloseBatchMessage payload, int maxRetry, - ClientCallback clientCallback) { + FutureCallback futureCallback) { - super(clients, minServers, payload, maxRetry, clientCallback); + super(clients, minServers, payload, maxRetry, futureCallback); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerGenericPostWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerGenericPostWorker.java index 8b62d4e..4ff96b1 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerGenericPostWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerGenericPostWorker.java @@ -1,6 +1,7 @@ package meerkat.bulletinboard.workers.multiserver; -import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.FutureCallback; import meerkat.bulletinboard.MultiServerWorker; import meerkat.bulletinboard.SingleServerBulletinBoardClient; import meerkat.comm.CommunicationException; @@ -17,9 +18,9 @@ public abstract class MultiServerGenericPostWorker extends MultiServerWorker< public MultiServerGenericPostWorker(List clients, int minServers, T payload, int maxRetry, - ClientCallback clientCallback) { + FutureCallback futureCallback) { - super(clients, minServers, payload, maxRetry, clientCallback); + super(clients, minServers, payload, maxRetry, futureCallback); } @@ -35,11 +36,6 @@ public abstract class MultiServerGenericPostWorker extends MultiServerWorker< */ public void run() { - WebTarget webTarget; - Response response; - - int count = 0; // Used to count number of servers which contain the required message in a GET_REDUNDANCY request. - // Iterate through servers Iterator clientIterator = getClientIterator(); @@ -56,7 +52,7 @@ public abstract class MultiServerGenericPostWorker extends MultiServerWorker< } @Override - public void handleCallback(Boolean result) { + public void onSuccess(Boolean result) { if (result){ if (minServers.decrementAndGet() <= 0){ succeed(Boolean.TRUE); @@ -65,7 +61,7 @@ public abstract class MultiServerGenericPostWorker extends MultiServerWorker< } @Override - public void handleFailure(Throwable t) { + public void onFailure(Throwable t) { if (maxFailedServers.decrementAndGet() < 0){ fail(t); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerGenericReadWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerGenericReadWorker.java index 6f6b425..68fc020 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerGenericReadWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerGenericReadWorker.java @@ -1,6 +1,6 @@ package meerkat.bulletinboard.workers.multiserver; -import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import com.google.common.util.concurrent.FutureCallback; import meerkat.bulletinboard.MultiServerWorker; import meerkat.bulletinboard.SingleServerBulletinBoardClient; import meerkat.comm.CommunicationException; @@ -18,9 +18,9 @@ public abstract class MultiServerGenericReadWorker extends MultiServerW public MultiServerGenericReadWorker(List clients, int minServers, IN payload, int maxRetry, - ClientCallback clientCallback) { + FutureCallback futureCallback) { - super(clients, true, minServers, payload, maxRetry, clientCallback); // Shuffle clients on creation to balance load + super(clients, true, minServers, payload, maxRetry, futureCallback); // Shuffle clients on creation to balance load clientIterator = getClientIterator(); @@ -41,10 +41,10 @@ public abstract class MultiServerGenericReadWorker extends MultiServerW if (clientIterator.hasNext()) { - // Send request to Server + // Get next server SingleServerBulletinBoardClient client = clientIterator.next(); - // Retrieve answer + // Retrieve answer from server doRead(payload, client); } else { @@ -54,12 +54,12 @@ public abstract class MultiServerGenericReadWorker extends MultiServerW } @Override - public void handleCallback(OUT msg) { + public void onSuccess(OUT msg) { succeed(msg); } @Override - public void handleFailure(Throwable t) { + public void onFailure(Throwable t) { run(); // Retry with next server } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerGetRedundancyWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerGetRedundancyWorker.java index 5675cb8..517dbdf 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerGetRedundancyWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerGetRedundancyWorker.java @@ -1,6 +1,6 @@ package meerkat.bulletinboard.workers.multiserver; -import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import com.google.common.util.concurrent.FutureCallback; import meerkat.bulletinboard.MultiServerWorker; import meerkat.bulletinboard.SingleServerBulletinBoardClient; import meerkat.comm.CommunicationException; @@ -20,9 +20,9 @@ public class MultiServerGetRedundancyWorker extends MultiServerWorker clients, int minServers, MessageID payload, int maxRetry, - ClientCallback clientCallback) { + FutureCallback futureCallback) { - super(clients, minServers, payload, maxRetry, clientCallback); // Shuffle clients on creation to balance load + super(clients, minServers, payload, maxRetry, futureCallback); // Shuffle clients on creation to balance load serversContainingMessage = new AtomicInteger(0); totalContactedServers = new AtomicInteger(0); @@ -54,7 +54,7 @@ public class MultiServerGetRedundancyWorker extends MultiServerWorker 0.5) { serversContainingMessage.incrementAndGet(); @@ -67,8 +67,8 @@ public class MultiServerGetRedundancyWorker extends MultiServerWorker clients, int minServers, BatchDataContainer payload, int maxRetry, - ClientCallback clientCallback) { + FutureCallback futureCallback) { - super(clients, minServers, payload, maxRetry, clientCallback); + super(clients, minServers, payload, maxRetry, futureCallback); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerPostBatchWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerPostBatchWorker.java index 7c2f586..1b1f3df 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerPostBatchWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerPostBatchWorker.java @@ -1,6 +1,6 @@ package meerkat.bulletinboard.workers.multiserver; -import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import com.google.common.util.concurrent.FutureCallback; import meerkat.bulletinboard.CompleteBatch; import meerkat.bulletinboard.SingleServerBulletinBoardClient; @@ -13,9 +13,9 @@ public class MultiServerPostBatchWorker extends MultiServerGenericPostWorker clients, int minServers, CompleteBatch payload, int maxRetry, - ClientCallback clientCallback) { + FutureCallback futureCallback) { - super(clients, minServers, payload, maxRetry, clientCallback); + super(clients, minServers, payload, maxRetry, futureCallback); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerPostMessageWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerPostMessageWorker.java index 33f9a3c..6d3d702 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerPostMessageWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerPostMessageWorker.java @@ -1,6 +1,6 @@ package meerkat.bulletinboard.workers.multiserver; -import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import com.google.common.util.concurrent.FutureCallback; import meerkat.bulletinboard.SingleServerBulletinBoardClient; import meerkat.protobuf.BulletinBoardAPI.*; @@ -13,9 +13,9 @@ public class MultiServerPostMessageWorker extends MultiServerGenericPostWorker clients, int minServers, BulletinBoardMessage payload, int maxRetry, - ClientCallback clientCallback) { + FutureCallback futureCallback) { - super(clients, minServers, payload, maxRetry, clientCallback); + super(clients, minServers, payload, maxRetry, futureCallback); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadBatchWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadBatchWorker.java index 737c15c..3d40c8a 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadBatchWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadBatchWorker.java @@ -1,6 +1,6 @@ package meerkat.bulletinboard.workers.multiserver; -import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import com.google.common.util.concurrent.FutureCallback; import meerkat.bulletinboard.CompleteBatch; import meerkat.bulletinboard.SingleServerBulletinBoardClient; import meerkat.protobuf.BulletinBoardAPI.BatchSpecificationMessage; @@ -15,9 +15,9 @@ public class MultiServerReadBatchWorker extends MultiServerGenericReadWorker clients, int minServers, BatchSpecificationMessage payload, int maxRetry, - ClientCallback clientCallback) { + FutureCallback futureCallback) { - super(clients, minServers, payload, maxRetry, clientCallback); + super(clients, minServers, payload, maxRetry, futureCallback); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadMessagesWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadMessagesWorker.java index b276eab..980d869 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadMessagesWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadMessagesWorker.java @@ -1,6 +1,6 @@ package meerkat.bulletinboard.workers.multiserver; -import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import com.google.common.util.concurrent.FutureCallback; import meerkat.bulletinboard.SingleServerBulletinBoardClient; import meerkat.protobuf.BulletinBoardAPI.*; @@ -14,9 +14,9 @@ public class MultiServerReadMessagesWorker extends MultiServerGenericReadWorker< public MultiServerReadMessagesWorker(List clients, int minServers, MessageFilterList payload, int maxRetry, - ClientCallback> clientCallback) { + FutureCallback> futureCallback) { - super(clients, minServers, payload, maxRetry, clientCallback); + super(clients, minServers, payload, maxRetry, futureCallback); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadBatchWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadBatchWorker.java index 61556fc..11fc777 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadBatchWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadBatchWorker.java @@ -23,7 +23,7 @@ import static meerkat.bulletinboard.BulletinBoardConstants.BATCH_ID_TAG_PREFIX; /** * Created by Arbel Deutsch Peled on 27-Dec-15. */ -public class SingleServerReadBatchWorker extends SingleServerWorker { +public class SingleServerReadBatchWorker extends SingleServerWorker> { public SingleServerReadBatchWorker(String serverAddress, BatchSpecificationMessage payload, int maxRetry) { super(serverAddress, payload, maxRetry); @@ -35,59 +35,13 @@ public class SingleServerReadBatchWorker extends SingleServerWorker call() throws CommunicationException{ Client client = clientLocal.get(); WebTarget webTarget; Response response; - // Set filters for the batch message metadata retrieval - - MessageFilterList messageFilterList = MessageFilterList.newBuilder() - .addFilter(MessageFilter.newBuilder() - .setType(FilterType.TAG) - .setTag(BATCH_ID_TAG_PREFIX + String.valueOf(payload.getBatchId())) - .build()) - .addFilter(MessageFilter.newBuilder() - .setType(FilterType.SIGNER_ID) - .setId(payload.getSignerId()) - .build()) - .build(); - - // Send request to Server - - webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(READ_MESSAGES_PATH); - response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post( - Entity.entity(messageFilterList, Constants.MEDIATYPE_PROTOBUF)); - - // Retrieve answer - - try { - - // If a BulletinBoardMessageList is returned: the read was successful - BulletinBoardMessage metadata = response.readEntity(BulletinBoardMessageList.class).getMessage(0); - - completeBatch.setBeginBatchMessage(BeginBatchMessage.newBuilder() - .setSignerId(payload.getSignerId()) - .setBatchId(payload.getBatchId()) - .addAllTag(metadata.getMsg().getTagList()) - .build()); - - completeBatch.setSignature(metadata.getSig(0)); - - } catch (ProcessingException | IllegalStateException e) { - - // Read failed - throw new CommunicationException("Could not contact the server"); - - } - finally { - response.close(); - } - // Get the batch data webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(READ_BATCH_PATH); @@ -98,9 +52,8 @@ public class SingleServerReadBatchWorker extends SingleServerWorker>(){})); + // If a BatchDataList is returned: the read was successful + return response.readEntity(BatchDataList.class).getDataList(); } catch (ProcessingException | IllegalStateException e) { @@ -112,8 +65,6 @@ public class SingleServerReadBatchWorker extends SingleServerWorker thrown; - - protected void genericHandleFailure(Throwable t){ - System.err.println(t.getCause() + " " + t.getMessage()); - thrown.add(t); - jobSemaphore.release(); - } - - private class PostCallback implements ClientCallback{ - - @Override - public void handleCallback(Boolean msg) { - System.err.println("Post operation completed"); - jobSemaphore.release(); - } - - @Override - public void handleFailure(Throwable t) { - genericHandleFailure(t); - } - } - - private class RedundancyCallback implements ClientCallback{ - - private float minRedundancy; - - public RedundancyCallback(float minRedundancy) { - this.minRedundancy = minRedundancy; - } - - @Override - public void handleCallback(Float redundancy) { - System.err.println("Redundancy found is: " + redundancy); - jobSemaphore.release(); - assertThat(redundancy, greaterThanOrEqualTo(minRedundancy)); - } - - @Override - public void handleFailure(Throwable t) { - genericHandleFailure(t); - } - } - - private class ReadCallback implements ClientCallback>{ - - private List expectedMsgList; - - public ReadCallback(List expectedMsgList) { - this.expectedMsgList = expectedMsgList; - } - - @Override - public void handleCallback(List messages) { - - System.err.println(messages); - jobSemaphore.release(); - - BulletinBoardMessageComparator msgComparator = new BulletinBoardMessageComparator(); - - assertThat(messages.size(), is(expectedMsgList.size())); - - Iterator expectedMessageIterator = expectedMsgList.iterator(); - Iterator receivedMessageIterator = messages.iterator(); - - while (expectedMessageIterator.hasNext()) { - assertThat(msgComparator.compare(expectedMessageIterator.next(), receivedMessageIterator.next()), is(0)); - } - - } - - @Override - public void handleFailure(Throwable t) { - genericHandleFailure(t); - } - } - - private AsyncBulletinBoardClient bulletinBoardClient; - - private PostCallback postCallback; - private RedundancyCallback redundancyCallback; - private ReadCallback readCallback; - - private static String PROP_GETTY_URL = "gretty.httpBaseURI"; - private static String DEFAULT_BASE_URL = "http://localhost:8081"; - private static String BASE_URL = System.getProperty(PROP_GETTY_URL, DEFAULT_BASE_URL); - - @Before - public void init(){ - - bulletinBoardClient = new ThreadedBulletinBoardClient(); - - List testDB = new LinkedList(); - testDB.add(BASE_URL); - - bulletinBoardClient.init(BulletinBoardClientParams.newBuilder() - .addBulletinBoardAddress("http://localhost:8081") - .setMinRedundancy((float) 1.0) - .build()); - - postCallback = new PostCallback(); - redundancyCallback = new RedundancyCallback((float) 1.0); - - thrown = new Vector<>(); - jobSemaphore = new Semaphore(0); - - } - - @Test - public void postTest() { - - byte[] b1 = {(byte) 1, (byte) 2, (byte) 3, (byte) 4}; - byte[] b2 = {(byte) 11, (byte) 12, (byte) 13, (byte) 14}; - byte[] b3 = {(byte) 21, (byte) 22, (byte) 23, (byte) 24}; - byte[] b4 = {(byte) 4, (byte) 5, (byte) 100, (byte) -50, (byte) 0}; - - BulletinBoardMessage msg; - - MessageFilterList filterList; - List msgList; - - MessageID messageID; - - Comparator msgComparator = new BulletinBoardMessageComparator(); - - msg = BulletinBoardMessage.newBuilder() - .setMsg(UnsignedBulletinBoardMessage.newBuilder() - .addTag("Signature") - .addTag("Trustee") - .setData(ByteString.copyFrom(b1)) - .build()) - .addSig(Crypto.Signature.newBuilder() - .setType(Crypto.SignatureType.DSA) - .setData(ByteString.copyFrom(b2)) - .setSignerId(ByteString.copyFrom(b3)) - .build()) - .addSig(Crypto.Signature.newBuilder() - .setType(Crypto.SignatureType.ECDSA) - .setData(ByteString.copyFrom(b3)) - .setSignerId(ByteString.copyFrom(b2)) - .build()) - .build(); - - messageID = bulletinBoardClient.postMessage(msg,postCallback); - - try { - jobSemaphore.acquire(); - } catch (InterruptedException e) { - System.err.println(e.getCause() + " " + e.getMessage()); - } - - bulletinBoardClient.getRedundancy(messageID,redundancyCallback); - - filterList = MessageFilterList.newBuilder() - .addFilter( - MessageFilter.newBuilder() - .setType(FilterType.TAG) - .setTag("Signature") - .build() - ) - .addFilter( - MessageFilter.newBuilder() - .setType(FilterType.TAG) - .setTag("Trustee") - .build() - ) - .build(); - - msgList = new LinkedList(); - msgList.add(msg); - - readCallback = new ReadCallback(msgList); - - bulletinBoardClient.readMessages(filterList, readCallback); - try { - jobSemaphore.acquire(2); - } catch (InterruptedException e) { - System.err.println(e.getCause() + " " + e.getMessage()); - } - - bulletinBoardClient.close(); - - if (thrown.size() > 0) { - assert false; - } - - } - -} diff --git a/bulletin-board-client/src/test/java/ThreadedBulletinBoardClientIntegrationTest.java b/bulletin-board-client/src/test/java/ThreadedBulletinBoardClientIntegrationTest.java new file mode 100644 index 0000000..c266086 --- /dev/null +++ b/bulletin-board-client/src/test/java/ThreadedBulletinBoardClientIntegrationTest.java @@ -0,0 +1,541 @@ +import com.google.common.util.concurrent.FutureCallback; +import com.google.protobuf.ByteString; +import meerkat.bulletinboard.AsyncBulletinBoardClient; +import meerkat.bulletinboard.CompleteBatch; +import meerkat.bulletinboard.GenericBatchDigitalSignature; +import meerkat.bulletinboard.ThreadedBulletinBoardClient; +import meerkat.comm.CommunicationException; +import meerkat.crypto.concrete.ECDSASignature; +import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.protobuf.Crypto; + +import meerkat.protobuf.Voting.*; +import meerkat.util.BulletinBoardMessageComparator; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.number.OrderingComparison.*; + +import java.io.IOException; +import java.io.InputStream; +import java.security.*; +import java.security.cert.CertificateException; +import java.util.*; +import java.util.concurrent.Semaphore; + +/** + * Created by Arbel Deutsch Peled on 05-Dec-15. + */ +public class ThreadedBulletinBoardClientIntegrationTest { + + // Signature resources + + private GenericBatchDigitalSignature signers[]; + private ByteString[] signerIDs; + + private static String KEYFILE_EXAMPLE = "/certs/enduser-certs/user1-key-with-password-secret.p12"; + private static String KEYFILE_EXAMPLE3 = "/certs/enduser-certs/user3-key-with-password-shh.p12"; + + private static String KEYFILE_PASSWORD1 = "secret"; + private static String KEYFILE_PASSWORD3 = "shh"; + + public static String CERT1_PEM_EXAMPLE = "/certs/enduser-certs/user1.crt"; + public static String CERT3_PEM_EXAMPLE = "/certs/enduser-certs/user3.crt"; + + // Server data + + private static String PROP_GETTY_URL = "gretty.httpBaseURI"; + private static String DEFAULT_BASE_URL = "http://localhost:8081"; + private static String BASE_URL = System.getProperty(PROP_GETTY_URL, DEFAULT_BASE_URL); + + // Client and callbacks + + private AsyncBulletinBoardClient bulletinBoardClient; + + private PostCallback postCallback; + private PostCallback failPostCallback = new PostCallback(true,false); + + private RedundancyCallback redundancyCallback; + private ReadCallback readCallback; + private ReadBatchCallback readBatchCallback; + + // Sync and misc + + private Semaphore jobSemaphore; + private Vector thrown; + private Random random; + + // Constructor + + public ThreadedBulletinBoardClientIntegrationTest(){ + + signers = new GenericBatchDigitalSignature[2]; + signerIDs = new ByteString[signers.length]; + signers[0] = new GenericBatchDigitalSignature(new ECDSASignature()); + signers[1] = new GenericBatchDigitalSignature(new ECDSASignature()); + + InputStream keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE); + char[] password = KEYFILE_PASSWORD1.toCharArray(); + + KeyStore.Builder keyStoreBuilder = null; + try { + keyStoreBuilder = signers[0].getPKCS12KeyStoreBuilder(keyStream, password); + + signers[0].loadSigningCertificate(keyStoreBuilder); + + signers[0].loadVerificationCertificates(getClass().getResourceAsStream(CERT1_PEM_EXAMPLE)); + + keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE3); + password = KEYFILE_PASSWORD3.toCharArray(); + + keyStoreBuilder = signers[1].getPKCS12KeyStoreBuilder(keyStream, password); + signers[1].loadSigningCertificate(keyStoreBuilder); + + signers[1].loadVerificationCertificates(getClass().getResourceAsStream(CERT3_PEM_EXAMPLE)); + + for (int i = 0 ; i < signers.length ; i++) { + signerIDs[i] = signers[i].getSignerID(); + } + + } catch (IOException e) { + System.err.println("Failed reading from signature file " + e.getMessage()); + fail("Failed reading from signature file " + e.getMessage()); + } catch (CertificateException e) { + System.err.println("Failed reading certificate " + e.getMessage()); + fail("Failed reading certificate " + e.getMessage()); + } catch (KeyStoreException e) { + System.err.println("Failed reading keystore " + e.getMessage()); + fail("Failed reading keystore " + e.getMessage()); + } catch (NoSuchAlgorithmException e) { + System.err.println("Couldn't find signing algorithm " + e.getMessage()); + fail("Couldn't find signing algorithm " + e.getMessage()); + } catch (UnrecoverableKeyException e) { + System.err.println("Couldn't find signing key " + e.getMessage()); + fail("Couldn't find signing key " + e.getMessage()); + } + + } + + // Callback definitions + + protected void genericHandleFailure(Throwable t){ + System.err.println(t.getCause() + " " + t.getMessage()); + thrown.add(t); + jobSemaphore.release(); + } + + private class PostCallback implements FutureCallback{ + + private boolean isAssert; + private boolean assertValue; + + public PostCallback() { + this(false); + } + + public PostCallback(boolean isAssert) { + this(isAssert,true); + } + + public PostCallback(boolean isAssert, boolean assertValue) { + this.isAssert = isAssert; + this.assertValue = assertValue; + } + + @Override + public void onSuccess(Boolean msg) { + System.err.println("Post operation completed"); + jobSemaphore.release(); + //TODO: Change Assert mechanism to exception one + if (isAssert) { + if (assertValue) { + assertThat("Post operation failed", msg, is(Boolean.TRUE)); + } else { + assertThat("Post operation succeeded unexpectedly", msg, is(Boolean.FALSE)); + } + } + } + + @Override + public void onFailure(Throwable t) { + genericHandleFailure(t); + } + } + + private class RedundancyCallback implements FutureCallback{ + + private float minRedundancy; + + public RedundancyCallback(float minRedundancy) { + this.minRedundancy = minRedundancy; + } + + @Override + public void onSuccess(Float redundancy) { + System.err.println("Redundancy found is: " + redundancy); + jobSemaphore.release(); + assertThat(redundancy, greaterThanOrEqualTo(minRedundancy)); + } + + @Override + public void onFailure(Throwable t) { + genericHandleFailure(t); + } + } + + private class ReadCallback implements FutureCallback>{ + + private List expectedMsgList; + + public ReadCallback(List expectedMsgList) { + this.expectedMsgList = expectedMsgList; + } + + @Override + public void onSuccess(List messages) { + + System.err.println(messages); + jobSemaphore.release(); + + BulletinBoardMessageComparator msgComparator = new BulletinBoardMessageComparator(); + + assertThat(messages.size(), is(expectedMsgList.size())); + + Iterator expectedMessageIterator = expectedMsgList.iterator(); + Iterator receivedMessageIterator = messages.iterator(); + + while (expectedMessageIterator.hasNext()) { + assertThat(msgComparator.compare(expectedMessageIterator.next(), receivedMessageIterator.next()), is(0)); + } + + } + + @Override + public void onFailure(Throwable t) { + genericHandleFailure(t); + } + } + + private class ReadBatchCallback implements FutureCallback { + + private CompleteBatch expectedBatch; + + public ReadBatchCallback(CompleteBatch expectedBatch) { + this.expectedBatch = expectedBatch; + } + + @Override + public void onSuccess(CompleteBatch batch) { + + System.err.println(batch); + jobSemaphore.release(); + + assertThat("Batch returned is incorrect", batch, is(equalTo(expectedBatch))); + + } + + @Override + public void onFailure(Throwable t) { + genericHandleFailure(t); + } + } + + // Randomness generators + + private byte randomByte(){ + return (byte) random.nextInt(); + } + + private byte[] randomByteArray(int length) { + + byte[] randomBytes = new byte[length]; + + for (int i = 0; i < length ; i++){ + randomBytes[i] = randomByte(); + } + + return randomBytes; + + } + + private CompleteBatch createRandomBatch(int signer, int batchId, int length) throws SignatureException { + + CompleteBatch completeBatch = new CompleteBatch(); + + // Create data + + completeBatch.setBeginBatchMessage(BeginBatchMessage.newBuilder() + .setSignerId(signerIDs[signer]) + .setBatchId(batchId) + .addTag("Test") + .build()); + + for (int i = 0 ; i < length ; i++){ + + BatchData batchData = BatchData.newBuilder() + .setData(ByteString.copyFrom(randomByteArray(i))) + .build(); + + completeBatch.appendBatchData(batchData); + + } + + signers[signer].updateContent(completeBatch); + + completeBatch.setSignature(signers[signer].sign()); + + return completeBatch; + + } + + // Test methods + + /** + * Takes care of initializing the client and the test resources + */ + @Before + public void init(){ + + bulletinBoardClient = new ThreadedBulletinBoardClient(); + + random = new Random(0); // We use insecure randomness in tests for repeatability + + List testDB = new LinkedList(); + testDB.add(BASE_URL); + + bulletinBoardClient.init(BulletinBoardClientParams.newBuilder() + .addBulletinBoardAddress("http://localhost:8081") + .setMinRedundancy((float) 1.0) + .build()); + + postCallback = new PostCallback(); + redundancyCallback = new RedundancyCallback((float) 1.0); + + thrown = new Vector<>(); + jobSemaphore = new Semaphore(0); + + } + + /** + * Closes the client and makes sure the test fails when an exception occurred in a separate thread + */ + + @After + public void close() { + + bulletinBoardClient.close(); + + if (thrown.size() > 0) { + assert false; + } + + } + + /** + * Tests the standard post, redundancy and read methods + */ + @Test + public void postTest() { + + byte[] b1 = {(byte) 1, (byte) 2, (byte) 3, (byte) 4}; + byte[] b2 = {(byte) 11, (byte) 12, (byte) 13, (byte) 14}; + byte[] b3 = {(byte) 21, (byte) 22, (byte) 23, (byte) 24}; + byte[] b4 = {(byte) 4, (byte) 5, (byte) 100, (byte) -50, (byte) 0}; + + BulletinBoardMessage msg; + + MessageFilterList filterList; + List msgList; + + MessageID messageID; + + Comparator msgComparator = new BulletinBoardMessageComparator(); + + msg = BulletinBoardMessage.newBuilder() + .setMsg(UnsignedBulletinBoardMessage.newBuilder() + .addTag("Signature") + .addTag("Trustee") + .setData(ByteString.copyFrom(b1)) + .build()) + .addSig(Crypto.Signature.newBuilder() + .setType(Crypto.SignatureType.DSA) + .setData(ByteString.copyFrom(b2)) + .setSignerId(ByteString.copyFrom(b3)) + .build()) + .addSig(Crypto.Signature.newBuilder() + .setType(Crypto.SignatureType.ECDSA) + .setData(ByteString.copyFrom(b3)) + .setSignerId(ByteString.copyFrom(b2)) + .build()) + .build(); + + messageID = bulletinBoardClient.postMessage(msg,postCallback); + + try { + jobSemaphore.acquire(); + } catch (InterruptedException e) { + System.err.println(e.getCause() + " " + e.getMessage()); + } + + bulletinBoardClient.getRedundancy(messageID,redundancyCallback); + + filterList = MessageFilterList.newBuilder() + .addFilter( + MessageFilter.newBuilder() + .setType(FilterType.TAG) + .setTag("Signature") + .build() + ) + .addFilter( + MessageFilter.newBuilder() + .setType(FilterType.TAG) + .setTag("Trustee") + .build() + ) + .build(); + + msgList = new LinkedList(); + msgList.add(msg); + + readCallback = new ReadCallback(msgList); + + bulletinBoardClient.readMessages(filterList, readCallback); + try { + jobSemaphore.acquire(2); + } catch (InterruptedException e) { + System.err.println(e.getCause() + " " + e.getMessage()); + } + + } + + /** + * Tests posting a batch by parts + * Also tests not being able to post to a closed batch + * @throws CommunicationException, SignatureException, InterruptedException + */ + @Test + public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + final int SIGNER = 1; + final int BATCH_ID = 100; + final int BATCH_LENGTH = 100; + + CompleteBatch completeBatch = createRandomBatch(SIGNER, BATCH_ID, BATCH_LENGTH); + + // Begin batch + + bulletinBoardClient.beginBatch(completeBatch.getBeginBatchMessage(), postCallback); + + jobSemaphore.acquire(); + + // Post data + + bulletinBoardClient.postBatchData(signerIDs[SIGNER], BATCH_ID, completeBatch.getBatchDataList(), postCallback); + + jobSemaphore.acquire(); + + // Close batch + + CloseBatchMessage closeBatchMessage = CloseBatchMessage.newBuilder() + .setBatchId(BATCH_ID) + .setBatchLength(BATCH_LENGTH) + .setSig(completeBatch.getSignature()) + .build(); + + bulletinBoardClient.closeBatch(closeBatchMessage, postCallback); + + jobSemaphore.acquire(); + + // Attempt to open batch again + + bulletinBoardClient.beginBatch(completeBatch.getBeginBatchMessage(), failPostCallback); + + // Attempt to add batch data + + bulletinBoardClient.postBatchData(signerIDs[SIGNER], BATCH_ID, completeBatch.getBatchDataList(), failPostCallback); + + jobSemaphore.acquire(2); + + // Read batch data + + BatchSpecificationMessage batchSpecificationMessage = + BatchSpecificationMessage.newBuilder() + .setSignerId(signerIDs[SIGNER]) + .setBatchId(BATCH_ID) + .setStartPosition(0) + .build(); + + readBatchCallback = new ReadBatchCallback(completeBatch); + + bulletinBoardClient.readBatch(batchSpecificationMessage, readBatchCallback); + + jobSemaphore.acquire(); + + } + + /** + * Posts a complete batch message + * Checks reading od the message + * @throws CommunicationException, SignatureException, InterruptedException + */ + @Test + public void testCompleteBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + final int SIGNER = 0; + final int BATCH_ID = 101; + final int BATCH_LENGTH = 50; + + // Post batch + + CompleteBatch completeBatch = createRandomBatch(SIGNER, BATCH_ID, BATCH_LENGTH); + + bulletinBoardClient.postBatch(completeBatch,postCallback); + + jobSemaphore.acquire(); + + // Read batch + + BatchSpecificationMessage batchSpecificationMessage = + BatchSpecificationMessage.newBuilder() + .setSignerId(signerIDs[SIGNER]) + .setBatchId(BATCH_ID) + .setStartPosition(0) + .build(); + + readBatchCallback = new ReadBatchCallback(completeBatch); + + bulletinBoardClient.readBatch(batchSpecificationMessage, readBatchCallback); + + jobSemaphore.acquire(); + + } + + /** + * Tests that an unopened batch cannot be closed + * @throws CommunicationException, InterruptedException + */ + @Test + public void testInvalidBatchClose() throws CommunicationException, InterruptedException { + + final int NON_EXISTENT_BATCH_ID = 999; + + CloseBatchMessage closeBatchMessage = + CloseBatchMessage.newBuilder() + .setBatchId(NON_EXISTENT_BATCH_ID) + .setBatchLength(1) + .setSig(Crypto.Signature.getDefaultInstance()) + .build(); + + // Try to close the (unopened) batch; + + bulletinBoardClient.closeBatch(closeBatchMessage, failPostCallback); + + jobSemaphore.acquire(); + + } + +} diff --git a/bulletin-board-server/build.gradle b/bulletin-board-server/build.gradle index 21604d1..59045a2 100644 --- a/bulletin-board-server/build.gradle +++ b/bulletin-board-server/build.gradle @@ -48,7 +48,7 @@ dependencies { // JDBC connections compile 'org.springframework:spring-jdbc:4.2.+' - compile 'org.xerial:sqlite-jdbc:3.7.+' + compile 'org.xerial:sqlite-jdbc:3.8.+' compile 'mysql:mysql-connector-java:5.1.+' compile 'com.h2database:h2:1.0.+' @@ -89,6 +89,11 @@ task h2Test(type: Test) { outputs.upToDateWhen { false } } +task liteTest(type: Test) { + include '**/*SQLite*Test*' + outputs.upToDateWhen { false } +} + task dbTest(type: Test) { include '**/*H2*Test*' include '**/*MySQL*Test*' diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java index 5b4fac9..a6313b6 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java @@ -177,8 +177,9 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ case MSG_ID: return MSG_ID; - case EXACT_ENTRY: // Go through - case MAX_ENTRY: + case EXACT_ENTRY: // Go through + case MAX_ENTRY: // Go through + case MIN_ENTRY: return ENTRY_NUM; case SIGNER_ID: @@ -253,12 +254,13 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ switch (messageFilter.getType()) { - case MSG_ID: // Go through + case MSG_ID: // Go through case SIGNER_ID: return messageFilter.getId().toByteArray(); - case EXACT_ENTRY: // Go through - case MAX_ENTRY: + case EXACT_ENTRY: // Go through + case MAX_ENTRY: // Go through + case MIN_ENTRY: return messageFilter.getEntry(); case TAG: @@ -653,7 +655,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ for (int i=0 ; i < tags.length ; i++) { namedParameters[i] = new MapSqlParameterSource(); - namedParameters[i].addValue(QueryType.CONNECT_BATCH_TAG.getParamName(0),message.getSignerId()); + namedParameters[i].addValue(QueryType.CONNECT_BATCH_TAG.getParamName(0),message.getSignerId().toByteArray()); namedParameters[i].addValue(QueryType.CONNECT_BATCH_TAG.getParamName(1),message.getBatchId()); namedParameters[i].addValue(QueryType.CONNECT_BATCH_TAG.getParamName(2),tags[i]); } @@ -675,7 +677,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ String sql = sqlQueryProvider.getSQLString(QueryType.INSERT_BATCH_DATA); MapSqlParameterSource namedParameters = new MapSqlParameterSource(); - namedParameters.addValue(QueryType.INSERT_BATCH_DATA.getParamName(0),batchMessage.getSignerId()); + namedParameters.addValue(QueryType.INSERT_BATCH_DATA.getParamName(0),batchMessage.getSignerId().toByteArray()); namedParameters.addValue(QueryType.INSERT_BATCH_DATA.getParamName(1),batchMessage.getBatchId()); namedParameters.addValue(QueryType.INSERT_BATCH_DATA.getParamName(2),batchMessage.getSerialNum()); namedParameters.addValue(QueryType.INSERT_BATCH_DATA.getParamName(3),batchMessage.getData().toByteArray()); @@ -700,7 +702,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ MapSqlParameterSource namedParameters = new MapSqlParameterSource(); - namedParameters.addValue(QueryType.CHECK_BATCH_LENGTH.getParamName(0),signerId); + namedParameters.addValue(QueryType.CHECK_BATCH_LENGTH.getParamName(0),signerId.toByteArray()); namedParameters.addValue(QueryType.CHECK_BATCH_LENGTH.getParamName(1),batchId); List lengthResult = jdbcTemplate.query(sql, namedParameters, new LongMapper()); @@ -733,7 +735,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ sql = sqlQueryProvider.getSQLString(QueryType.GET_BATCH_MESSAGE_DATA); namedParameters = new MapSqlParameterSource(); - namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(0),signerId); + namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(0),signerId.toByteArray()); namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(1),batchId); namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(2),0); // Read from the beginning @@ -775,7 +777,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ sql = sqlQueryProvider.getSQLString(QueryType.REMOVE_BATCH_TAGS); namedParameters = new MapSqlParameterSource(); - namedParameters.addValue(QueryType.REMOVE_BATCH_TAGS.getParamName(0), signerId); + namedParameters.addValue(QueryType.REMOVE_BATCH_TAGS.getParamName(0), signerId.toByteArray()); namedParameters.addValue(QueryType.REMOVE_BATCH_TAGS.getParamName(1), batchId); jdbcTemplate.update(sql, namedParameters); @@ -786,7 +788,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ } @Override - public List readBatch(BatchSpecificationMessage message) throws CommunicationException, IllegalArgumentException{ + public BatchDataList readBatch(BatchSpecificationMessage message) throws CommunicationException, IllegalArgumentException{ // Check that batch is closed if (!isBatchClosed(message.getSignerId(), message.getBatchId())) { @@ -796,11 +798,13 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ String sql = sqlQueryProvider.getSQLString(QueryType.GET_BATCH_MESSAGE_DATA); MapSqlParameterSource namedParameters = new MapSqlParameterSource(); - namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(0),message.getSignerId()); + namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(0),message.getSignerId().toByteArray()); namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(1),message.getBatchId()); namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(2),message.getStartPosition()); - return jdbcTemplate.query(sql, namedParameters, new BatchDataMapper()); + return BatchDataList.newBuilder() + .addAllData(jdbcTemplate.query(sql, namedParameters, new BatchDataMapper())) + .build(); } @Override diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java index 14bf9e2..659d9c3 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java @@ -134,6 +134,8 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider return "MsgTable.EntryNum = :EntryNum" + serialString; case MAX_ENTRY: return "MsgTable.EntryNum <= :EntryNum" + serialString; + case MIN_ENTRY: + return "MsgTable.EntryNum >= :EntryNum" + serialString; case MAX_MESSAGES: return "LIMIT :Limit" + serialString; case MSG_ID: @@ -157,6 +159,7 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider switch(filterType) { case EXACT_ENTRY: // Go through case MAX_ENTRY: // Go through + case MIN_ENTRY: // Go through case MAX_MESSAGES: return "INT"; diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/MySQLQueryProvider.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/MySQLQueryProvider.java index c8357a3..e3bdef0 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/MySQLQueryProvider.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/MySQLQueryProvider.java @@ -151,6 +151,8 @@ public class MySQLQueryProvider implements SQLQueryProvider { return "MsgTable.EntryNum = :EntryNum" + serialString; case MAX_ENTRY: return "MsgTable.EntryNum <= :EntryNum" + serialString; + case MIN_ENTRY: + return "MsgTable.EntryNum >= :EntryNum" + serialString; case MAX_MESSAGES: return "LIMIT :Limit" + serialString; case MSG_ID: @@ -174,6 +176,7 @@ public class MySQLQueryProvider implements SQLQueryProvider { switch(filterType) { case EXACT_ENTRY: // Go through case MAX_ENTRY: // Go through + case MIN_ENTRY: // Go through case MAX_MESSAGES: return "INT"; diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/SQLiteQueryProvider.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/SQLiteQueryProvider.java index d796789..b581b87 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/SQLiteQueryProvider.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/SQLiteQueryProvider.java @@ -54,6 +54,8 @@ public class SQLiteQueryProvider implements BulletinBoardSQLServer.SQLQueryProvi return "MsgTable.EntryNum = :EntryNum" + serialString; case MAX_ENTRY: return "MsgTable.EntryNum <= :EntryNum" + serialString; + case MIN_ENTRY: + return "MsgTable.EntryNum <= :EntryNum" + serialString; case MAX_MESSAGES: return "LIMIT = :Limit" + serialString; case MSG_ID: @@ -77,6 +79,7 @@ public class SQLiteQueryProvider implements BulletinBoardSQLServer.SQLQueryProvi switch(filterType) { case EXACT_ENTRY: // Go through case MAX_ENTRY: // Go through + case MIN_ENTRY: // Go through case MAX_MESSAGES: return "INTEGER"; diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java index fe3d2fc..766af19 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java @@ -106,6 +106,7 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL @Override public BoolMsg beginBatch(BeginBatchMessage message) { try { + init(); return bulletinBoard.beginBatch(message); } catch (CommunicationException e) { System.err.println(e.getMessage()); @@ -120,6 +121,7 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL @Override public BoolMsg postBatchMessage(BatchMessage batchMessage) { try { + init(); return bulletinBoard.postBatchMessage(batchMessage); } catch (CommunicationException e) { System.err.println(e.getMessage()); @@ -134,6 +136,7 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL @Override public BoolMsg closeBatchMessage(CloseBatchMessage message) { try { + init(); return bulletinBoard.closeBatchMessage(message); } catch (CommunicationException e) { System.err.println(e.getMessage()); @@ -146,8 +149,9 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL @Consumes(MEDIATYPE_PROTOBUF) @Produces(MEDIATYPE_PROTOBUF) @Override - public List readBatch(BatchSpecificationMessage message) { + public BatchDataList readBatch(BatchSpecificationMessage message) { try { + init(); return bulletinBoard.readBatch(message); } catch (CommunicationException | IllegalArgumentException e) { System.err.println(e.getMessage()); diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java index 7732fcb..c6e330c 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java @@ -1,5 +1,6 @@ package meerkat.bulletinboard; +import com.google.common.util.concurrent.FutureCallback; import com.google.protobuf.ByteString; import meerkat.protobuf.BulletinBoardAPI.*; @@ -10,11 +11,6 @@ import java.util.List; */ public interface AsyncBulletinBoardClient extends BulletinBoardClient { - public interface ClientCallback { - void handleCallback(T msg); - void handleFailure(Throwable t); - } - public interface MessageHandler { void handleNewMessages(List messageList); } @@ -25,7 +21,7 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient { * @param callback is a class containing methods to handle the result of the operation * @return a unique message ID for the message, that can be later used to retrieve the batch */ - public MessageID postMessage(BulletinBoardMessage msg, ClientCallback callback); + public MessageID postMessage(BulletinBoardMessage msg, FutureCallback callback); /** * Perform an end-to-end post of a signed batch message @@ -33,14 +29,14 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient { * @param callback is a class containing methods to handle the result of the operation * @return a unique identifier for the batch message */ - public MessageID postBatch(CompleteBatch completeBatch, ClientCallback callback); + public MessageID postBatch(CompleteBatch completeBatch, FutureCallback callback); /** * This message informs the server about the existence of a new batch message and supplies it with the tags associated with it * @param beginBatchMessage contains the data required to begin the batch * @param callback is a callback function class for handling results of the operation */ - public void beginBatch(BeginBatchMessage beginBatchMessage, ClientCallback callback); + public void beginBatch(BeginBatchMessage beginBatchMessage, FutureCallback callback); /** * This method posts batch data into an (assumed to be open) batch @@ -54,30 +50,30 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient { * @param callback is a callback function class for handling results of the operation */ public void postBatchData(byte[] signerId, int batchId, List batchDataList, - int startPosition, ClientCallback callback); + int startPosition, FutureCallback callback); /** * Overloading of the postBatchData method which starts at the first position in the batch */ - public void postBatchData(byte[] signerId, int batchId, List batchDataList, ClientCallback callback); + public void postBatchData(byte[] signerId, int batchId, List batchDataList, FutureCallback callback); /** * Overloading of the postBatchData method which uses ByteString */ public void postBatchData(ByteString signerId, int batchId, List batchDataList, - int startPosition, ClientCallback callback); + int startPosition, FutureCallback callback); /** * Overloading of the postBatchData method which uses ByteString and starts at the first position in the batch */ - public void postBatchData(ByteString signerId, int batchId, List batchDataList, ClientCallback callback); + public void postBatchData(ByteString signerId, int batchId, List batchDataList, FutureCallback callback); /** * Attempts to close a batch message * @param closeBatchMessage contains the data required to close the batch * @param callback is a callback function class for handling results of the operation */ - public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback callback); + public void closeBatch(CloseBatchMessage closeBatchMessage, FutureCallback callback); /** * Check how "safe" a given message is in an asynchronous manner @@ -85,7 +81,7 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient { * @param id is the unique message identifier for retrieval * @param callback is a callback function class for handling results of the operation */ - public void getRedundancy(MessageID id, ClientCallback callback); + public void getRedundancy(MessageID id, FutureCallback callback); /** * Read all messages posted matching the given filter in an asynchronous manner @@ -95,14 +91,14 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient { * @param filterList return only messages that match the filters (null means no filtering). * @param callback is a callback function class for handling results of the operation */ - public void readMessages(MessageFilterList filterList, ClientCallback> callback); + public void readMessages(MessageFilterList filterList, FutureCallback> callback); /** * Read a given batch message from the bulletin board * @param batchSpecificationMessage contains the data required to specify a single batch instance * @param callback is a callback class for handling the result of the operation */ - public void readBatch(BatchSpecificationMessage batchSpecificationMessage, ClientCallback callback); + public void readBatch(BatchSpecificationMessage batchSpecificationMessage, FutureCallback callback); /** * Subscribes to a notifier that will return any new messages on the server that match the given filters diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java index 70721a7..cbf06ff 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java @@ -75,7 +75,7 @@ public interface BulletinBoardServer{ * @throws CommunicationException on DB connection error * @throws IllegalArgumentException if message does not specify a batch */ - public List readBatch(BatchSpecificationMessage message) throws CommunicationException, IllegalArgumentException; + public BatchDataList readBatch(BatchSpecificationMessage message) throws CommunicationException, IllegalArgumentException; /** * This method closes the connection to the DB diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/CompleteBatch.java b/meerkat-common/src/main/java/meerkat/bulletinboard/CompleteBatch.java index 19c57e3..227c7ca 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/CompleteBatch.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/CompleteBatch.java @@ -2,6 +2,7 @@ package meerkat.bulletinboard; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Crypto.*; +import meerkat.util.BulletinBoardMessageComparator; import java.util.LinkedList; import java.util.List; @@ -48,6 +49,14 @@ public class CompleteBatch { return signature; } + public CloseBatchMessage getCloseBatchMessage() { + return CloseBatchMessage.newBuilder() + .setBatchId(getBeginBatchMessage().getBatchId()) + .setBatchLength(getBatchDataList().size()) + .setSig(getSignature()) + .build(); + } + public void setBeginBatchMessage(BeginBatchMessage beginBatchMessage) { this.beginBatchMessage = beginBatchMessage; } @@ -64,4 +73,45 @@ public class CompleteBatch { signature = newSignature; } + @Override + public boolean equals(Object other) { + + if (!(other instanceof CompleteBatch)) { + return false; + } + + CompleteBatch otherBatch = (CompleteBatch) other; + + boolean result = true; + + if (beginBatchMessage == null) { + if (otherBatch.getBeginBatchMessage() != null) + return false; + } else { + result = result && beginBatchMessage.equals(otherBatch.getBeginBatchMessage()); + } + + if (batchDataList == null) { + if (otherBatch.getBatchDataList() != null) + return false; + } else { + result = result && batchDataList.equals(otherBatch.getBatchDataList()); + } + + if (signature == null) { + if (otherBatch.getSignature() != null) + return false; + } else { + result = result && signature.equals(otherBatch.getSignature()); + } + + return result; + + } + + @Override + public String toString() { + return "Batch " + beginBatchMessage.getSignerId().toString() + ":" + beginBatchMessage.getBatchId(); + } + } diff --git a/meerkat-common/src/main/java/meerkat/util/BulletinBoardUtils.java b/meerkat-common/src/main/java/meerkat/util/BulletinBoardUtils.java new file mode 100644 index 0000000..b19bab3 --- /dev/null +++ b/meerkat-common/src/main/java/meerkat/util/BulletinBoardUtils.java @@ -0,0 +1,65 @@ +package meerkat.util; + +import meerkat.protobuf.BulletinBoardAPI.*; + +import java.util.LinkedList; +import java.util.List; + +/** + * Created by Arbel Deutsch Peled on 16-Feb-16. + */ +public class BulletinBoardUtils { + + /** + * Searches the tags in the message for one that begins with given prefix + * @param message is the message to search + * @param prefix is the given prefix + * @return the tag without the prefix, if found, or null if not found + */ + public static String findTagWithPrefix(BulletinBoardMessage message, String prefix) { + + for (String tag : message.getMsg().getTagList()){ + if (tag.startsWith(prefix)) { + return tag.substring(prefix.length()); + } + } + + return null; + + } + + /** + * Searches the tags in a message for tags that do not contain a given list of prefixes + * @param message is the message to search + * @param prefixes is the list of prefixes + * @return a list of the tags that do *not* contain any of the given prefixes + */ + public static List removePrefixTags(BulletinBoardMessage message, Iterable prefixes) { + + if (prefixes == null) + return message.getMsg().getTagList(); + + List result = new LinkedList<>(); + + for (String tag : message.getMsg().getTagList()){ + + boolean found = false; + + for (String prefix : prefixes){ + if (tag.startsWith(prefix)){ + found = true; + break; + } + } + + if (!found) { + result.add(tag); + } + + } + + return result; + + } + +} diff --git a/meerkat-common/src/main/proto/meerkat/BulletinBoardAPI.proto b/meerkat-common/src/main/proto/meerkat/BulletinBoardAPI.proto index 2ae4068..b86debd 100644 --- a/meerkat-common/src/main/proto/meerkat/BulletinBoardAPI.proto +++ b/meerkat-common/src/main/proto/meerkat/BulletinBoardAPI.proto @@ -50,13 +50,14 @@ enum FilterType { MSG_ID = 0; // Match exact message ID EXACT_ENTRY = 1; // Match exact entry number in database (chronological) MAX_ENTRY = 2; // Find all entries in database up to specified entry number (chronological) - SIGNER_ID = 3; // Find all entries in database that correspond to specific signature (signer) - TAG = 4; // Find all entries in database that have a specific tag + MIN_ENTRY = 3; // Find all entries in database starting from specified entry number (chronological) + SIGNER_ID = 4; // Find all entries in database that correspond to specific signature (signer) + TAG = 5; // Find all entries in database that have a specific tag // NOTE: The MAX_MESSAGES filter must remain the last filter type // This is because the condition it specifies in an SQL statement must come last in the statement // Keeping it last here allows for easily sorting the filters and keeping the code general - MAX_MESSAGES = 5; // Return at most some specified number of messages + MAX_MESSAGES = 6; // Return at most some specified number of messages } message MessageFilter { @@ -98,6 +99,11 @@ message BatchData { bytes data = 1; } +// List of BatchData; Only used for testing +message BatchDataList { + repeated BatchData data = 1; +} + // These messages comprise a batch message message BatchMessage { bytes signerId = 1; // Unique signer identifier