diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java index 31521e1..6594b0c 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java @@ -3,9 +3,7 @@ package meerkat.bulletinboard; import com.google.common.util.concurrent.FutureCallback; import com.google.protobuf.Timestamp; import meerkat.comm.CommunicationException; -import meerkat.protobuf.BulletinBoardAPI; import meerkat.protobuf.BulletinBoardAPI.*; -import meerkat.protobuf.Comm; import meerkat.protobuf.Crypto.Signature; import meerkat.protobuf.Voting.*; @@ -267,9 +265,9 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien } @Override - public void readBatch(final MessageID msgID, final FutureCallback callback) { + public void readMessage(final MessageID msgID, final FutureCallback callback) { - localClient.readBatch(msgID, new FutureCallback() { + localClient.readMessage(msgID, new FutureCallback() { @Override public void onSuccess(BulletinBoardMessage result) { @@ -282,7 +280,7 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien // Read from local unsuccessful: try to read from remote - remoteClient.readBatch(msgID, new FutureCallback() { + remoteClient.readMessage(msgID, new FutureCallback() { @Override public void onSuccess(BulletinBoardMessage result) { @@ -398,17 +396,17 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien } @Override - public BulletinBoardMessage readBatch(MessageID msgID) throws CommunicationException { + public BulletinBoardMessage readMessage(MessageID msgID) throws CommunicationException { BulletinBoardMessage result = null; try { - result = localClient.readBatch(msgID); + result = localClient.readMessage(msgID); } catch (CommunicationException e) { //TODO: log } if (result == null){ - result = remoteClient.readBatch(msgID); + result = remoteClient.readMessage(msgID); if (result != null){ localClient.postMessage(result); diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java index f09b2d3..2b904ed 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java @@ -183,6 +183,8 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo } + batchId.setLength(i); + return true; } @@ -244,6 +246,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo CloseBatchMessage closeBatchMessage = CloseBatchMessage.newBuilder() .setBatchId(identifier.getBatchId().getValue()) + .setBatchLength(identifier.getLength()) .setTimestamp(timestamp) .addAllSig(signatures) .build(); @@ -438,7 +441,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo @Override public BulletinBoardMessage call() throws Exception { - // Read message stub + // Read message (mat be a stub) MessageFilterList filterList = MessageFilterList.newBuilder() .addFilter(MessageFilter.newBuilder() @@ -451,19 +454,25 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo List bulletinBoardMessages = messageReader.call(); if (bulletinBoardMessages.size() <= 0) { - throw new NotFoundException("Batch does not exist"); + throw new NotFoundException("Message does not exist"); } - BulletinBoardMessage stub = bulletinBoardMessages.get(0); + BulletinBoardMessage msg = bulletinBoardMessages.get(0); - // Read data + if (msg.getMsg().getDataTypeCase() == UnsignedBulletinBoardMessage.DataTypeCase.MSGID) { - BatchDataReader batchDataReader = new BatchDataReader(msgID); - List batchChunkList = batchDataReader.call(); + // Read data - // Combine and return + BatchDataReader batchDataReader = new BatchDataReader(msgID); + List batchChunkList = batchDataReader.call(); - return BulletinBoardUtils.gatherBatch(stub, batchChunkList); + // Combine and return + + return BulletinBoardUtils.gatherBatch(msg, batchChunkList); + + } else { + return msg; + } } @@ -493,7 +502,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo } @Override - public void readBatch(MessageID msgID, FutureCallback callback) { + public void readMessage(MessageID msgID, FutureCallback callback) { Futures.addCallback(executorService.submit(new CompleteBatchReader(msgID)), callback); } @@ -591,7 +600,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo } @Override - public BulletinBoardMessage readBatch(MessageID msgID) throws CommunicationException { + public BulletinBoardMessage readMessage(MessageID msgID) throws CommunicationException { MessageFilterList filterList = MessageFilterList.newBuilder() .addFilter(MessageFilter.newBuilder() @@ -617,20 +626,14 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo throw new IllegalArgumentException("Message is not a stub and does not contain the required message ID"); } - MessageID msgID = MessageID.newBuilder().setID(stub.getMsg().getMsgId()).build(); - - BatchDataReader batchDataReader = new BatchDataReader(msgID); - - List batchChunkList = null; + BatchDataCombiner combiner = new BatchDataCombiner(stub); try { - batchChunkList = batchDataReader.call(); + return combiner.call(); } catch (Exception e) { throw new CommunicationException(e.getCause() + " " + e.getMessage()); } - return BulletinBoardUtils.gatherBatch(stub, batchChunkList); - } @Override diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java index 6250784..521815d 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java @@ -224,7 +224,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{ } @Override - public BulletinBoardMessage readBatch(MessageID msgID) throws CommunicationException { + public BulletinBoardMessage readMessage(MessageID msgID) throws CommunicationException { MessageFilterList filterList = MessageFilterList.newBuilder() .addFilter(MessageFilter.newBuilder() diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java index e8abf6b..5cdf73b 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java @@ -36,75 +36,6 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize private Semaphore semaphore; - /** - * This class is a callback that deletes a message if it has been successfully posted - * It also calls a stored callback - */ - private class MessageDeleteCallback implements FutureCallback { - - private final long entryNum; - private final FutureCallback callback; - - public MessageDeleteCallback(long entryNum, FutureCallback callback) { - this.entryNum = entryNum; - this.callback = callback; - } - - @Override - public void onSuccess(Boolean result) { - // Success: delete from database - localClient.deleteMessage(entryNum, null); - callback.onSuccess(null); - } - - @Override - public void onFailure(Throwable t) { - callback.onFailure(t); - } - - } - - /** - * This class aggregates the results from all of the post operations - * If any post has failed: it changes the sync status to SERVER_ERROR - * It also notifies the main sync loop when all uploads are finished - */ - private class SyncStatusUpdateCallback implements FutureCallback { - - private int count; - private boolean errorEncountered; - - public SyncStatusUpdateCallback(int count) { - this.count = count; - this.errorEncountered = false; - } - - private void handleStatusUpdate() { - count--; - if (count <= 0) { - - if (errorEncountered) - updateSyncStatus(SyncStatus.SERVER_ERROR); - - // Upload is done: wake up the synchronizer loop - semaphore.release(); - - } - } - - @Override - public void onSuccess(Void result) { - handleStatusUpdate(); - } - - @Override - public void onFailure(Throwable t) { - errorEncountered = true; - handleStatusUpdate(); - } - - } - private class SyncCallback implements FutureCallback> { @Override @@ -122,8 +53,6 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize // Handle upload and status change - SyncStatusUpdateCallback syncStatusUpdateCallback = new SyncStatusUpdateCallback(result.size()); - SyncStatus newStatus = SyncStatus.PENDING; if (result.size() == 0) { @@ -143,13 +72,17 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize BulletinBoardMessage completeMsg = localClient.readBatchData(message); - remoteClient.postMessage(completeMsg, new MessageDeleteCallback(message.getEntryNum(), syncStatusUpdateCallback)); + remoteClient.postMessage(completeMsg); + + localClient.deleteMessage(completeMsg.getEntryNum()); } else { // This is a regular message: post it - remoteClient.postMessage(message, new MessageDeleteCallback(message.getEntryNum(), syncStatusUpdateCallback)); + remoteClient.postMessage(message); + + localClient.deleteMessage(message.getEntryNum()); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java index b985a8a..86ede70 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java @@ -7,6 +7,8 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Int64Value; import com.google.protobuf.Timestamp; import meerkat.bulletinboard.workers.singleserver.*; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Crypto; import meerkat.protobuf.Voting.BulletinBoardClientParams; @@ -166,91 +168,74 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } } + private class ReadBatchCallback implements FutureCallback> { + + private final BulletinBoardMessage stub; + private final FutureCallback callback; + + public ReadBatchCallback(BulletinBoardMessage stub, FutureCallback callback) { + this.stub = stub; + this.callback = callback; + } + + @Override + public void onSuccess(List result) { + callback.onSuccess(BulletinBoardUtils.gatherBatch(stub, result)); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + + } + /** - * This callback ties together the different parts of a CompleteBatch as they arrive from the server - * It assembles a CompleteBatch from the parts and sends it to the user if all parts arrived - * If any part fails to arrive: it invokes the onFailure method + * This callback receives a message which may be a stub + * If the message is not a stub: it returns it as is to a callback function + * If it is a stub: it schedules a read of the batch data which will return a complete message to the callback function */ - class CompleteBatchReadCallback { + class CompleteMessageReadCallback implements FutureCallback>{ private final FutureCallback callback; - private List batchChunkList; - private BulletinBoardMessage stub; - - private AtomicInteger remainingQueries; - private AtomicBoolean failed; - - public CompleteBatchReadCallback(FutureCallback callback) { + public CompleteMessageReadCallback(FutureCallback callback) { this.callback = callback; - remainingQueries = new AtomicInteger(2); - failed = new AtomicBoolean(false); - } - protected void combineAndReturn() { + @Override + public void onSuccess(List result) { + if (result.size() <= 0) { + onFailure(new CommunicationException("Could not find required message on the server.")); + } else { - if (remainingQueries.decrementAndGet() == 0){ + BulletinBoardMessage msg = result.get(0); - if (callback != null) - callback.onSuccess(BulletinBoardUtils.gatherBatch(stub, batchChunkList)); - } + if (msg.getMsg().getDataTypeCase() != UnsignedBulletinBoardMessage.DataTypeCase.MSGID) { + callback.onSuccess(msg); + } else { - } + // Create job with MAX retries for retrieval of the Batch Data List - protected void fail(Throwable t) { - if (failed.compareAndSet(false, true)) { - if (callback != null) - callback.onFailure(t); + BatchQuery batchQuery = BatchQuery.newBuilder() + .setMsgID(MessageID.newBuilder() + .setID(msg.getMsg().getMsgId()) + .build()) + .build(); + + SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(meerkatDBs.get(0), batchQuery, MAX_RETRIES); + + scheduleWorker(batchWorker, new ReadBatchCallback(msg, callback)); + + } } } - /** - * @return a FutureCallback for the Batch Data List that ties to this object - */ - public FutureCallback> asBatchDataListFutureCallback() { - return new FutureCallback>() { - - @Override - public void onSuccess(List result) { - batchChunkList = result; - - combineAndReturn(); - } - - @Override - public void onFailure(Throwable t) { - fail(t); - } - - }; - } - - /** - * @return a FutureCallback for the Bulletin Board Message that ties to this object - */ - public FutureCallback> asBulletinBoardMessageListFutureCallback() { - return new FutureCallback>() { - - @Override - public void onSuccess(List result) { - if (result.size() < 1){ - onFailure(new IllegalArgumentException("Server returned empty message list")); - return; - } - - stub = result.get(0); - - combineAndReturn(); - } - - @Override - public void onFailure(Throwable t) { - fail(t); - } - }; + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); } } @@ -574,9 +559,9 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } @Override - public void readBatch(MessageID msgID, FutureCallback callback) { + public void readMessage(MessageID msgID, FutureCallback callback) { - // Create job with MAX retries for retrieval of the Bulletin Board Message that defines the batch + // Create job with MAX retries for retrieval of the Bulletin Board Message (which may be a stub) MessageFilterList filterList = MessageFilterList.newBuilder() .addFilter(MessageFilter.newBuilder() @@ -592,39 +577,11 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i SingleServerReadMessagesWorker messageWorker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterList, MAX_RETRIES); - // Create job with MAX retries for retrieval of the Batch Data List - SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(meerkatDBs.get(0), batchQuery, MAX_RETRIES); - - // Create callback that will combine the two worker products - CompleteBatchReadCallback completeBatchReadCallback = new CompleteBatchReadCallback(callback); - // Submit jobs with wrapped callbacks - scheduleWorker(messageWorker, new RetryCallback<>(messageWorker, completeBatchReadCallback.asBulletinBoardMessageListFutureCallback())); - scheduleWorker(batchWorker, new RetryCallback<>(batchWorker, completeBatchReadCallback.asBatchDataListFutureCallback())); + scheduleWorker(messageWorker, new RetryCallback<>(messageWorker, new CompleteMessageReadCallback(callback))); } - private class ReadBatchCallback implements FutureCallback> { - - private final BulletinBoardMessage stub; - private final FutureCallback callback; - - public ReadBatchCallback(BulletinBoardMessage stub, FutureCallback callback) { - this.stub = stub; - this.callback = callback; - } - - @Override - public void onSuccess(List result) { - callback.onSuccess(BulletinBoardUtils.gatherBatch(stub, result)); - } - - @Override - public void onFailure(Throwable t) { - - } - } - @Override public void readBatchData(BulletinBoardMessage stub, FutureCallback callback) throws IllegalArgumentException{ diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java index 7f8232f..5c1cab4 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java @@ -4,7 +4,6 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.protobuf.Timestamp; import meerkat.bulletinboard.workers.multiserver.*; -import meerkat.protobuf.BulletinBoardAPI; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Crypto.Signature; import meerkat.protobuf.Voting.*; @@ -208,11 +207,11 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple } @Override - public void readBatch(MessageID msgID, FutureCallback callback) { + public void readMessage(MessageID msgID, FutureCallback callback) { //Create job - MultiServerReadBatchWorker worker = - new MultiServerReadBatchWorker(clients, minAbsoluteRedundancy, msgID, READ_MESSAGES_RETRY_NUM, callback); + MultiServerReadMessageWorker worker = + new MultiServerReadMessageWorker(clients, minAbsoluteRedundancy, msgID, READ_MESSAGES_RETRY_NUM, callback); // Submit job executorService.submit(worker); diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadBatchDataWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadBatchDataWorker.java index 959c60f..ab5fd40 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadBatchDataWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadBatchDataWorker.java @@ -22,7 +22,7 @@ public class MultiServerReadBatchDataWorker extends MultiServerGenericReadWorker @Override protected void doRead(MessageID payload, SingleServerBulletinBoardClient client) { - client.readBatch(payload, this); + client.readMessage(payload, this); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadBatchWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadMessageWorker.java similarity index 56% rename from bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadBatchWorker.java rename to bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadMessageWorker.java index 59b4ce5..f84d67e 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadBatchWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadMessageWorker.java @@ -11,11 +11,11 @@ import java.util.List; /** * Created by Arbel Deutsch Peled on 27-Dec-15. */ -public class MultiServerReadBatchWorker extends MultiServerGenericReadWorker { +public class MultiServerReadMessageWorker extends MultiServerGenericReadWorker { - public MultiServerReadBatchWorker(List clients, - int minServers, MessageID payload, int maxRetry, - FutureCallback futureCallback) { + public MultiServerReadMessageWorker(List clients, + int minServers, MessageID payload, int maxRetry, + FutureCallback futureCallback) { super(clients, minServers, payload, maxRetry, futureCallback); @@ -23,7 +23,7 @@ public class MultiServerReadBatchWorker extends MultiServerGenericReadWorker 0) { - for (Throwable t : thrown) - System.err.println(t.getMessage()); - assertThat("Exception thrown by Synchronizer: " + thrown.get(0).getMessage(), false); - } - } @After public void close() { + if (thrown.size() > 0) { + for (Throwable t : thrown) { + System.err.println(t.getMessage()); + } + assertThat("Exception thrown by Synchronizer: " + thrown.get(0).getMessage(), false); + } + synchronizer.stop(); localClient.close(); remoteClient.close(); diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericBulletinBoardClientTester.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericBulletinBoardClientTester.java index fa708d5..5e60957 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericBulletinBoardClientTester.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericBulletinBoardClientTester.java @@ -5,9 +5,13 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; import meerkat.comm.CommunicationException; import meerkat.crypto.concrete.ECDSASignature; +import meerkat.crypto.concrete.SHA256Digest; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Crypto; import meerkat.util.BulletinBoardMessageComparator; +import meerkat.util.BulletinBoardMessageGenerator; +import meerkat.util.BulletinBoardUtils; +import meerkat.bulletinboard.AsyncBulletinBoardClient.BatchIdentifier; import java.io.IOException; import java.io.InputStream; @@ -27,7 +31,7 @@ public class GenericBulletinBoardClientTester { // Signature resources - private GenericBatchDigitalSignature signers[]; + private BulletinBoardSignature signers[]; private ByteString[] signerIDs; private static String KEYFILE_EXAMPLE = "/certs/enduser-certs/user1-key-with-password-secret.p12"; @@ -48,13 +52,15 @@ public class GenericBulletinBoardClientTester { private RedundancyCallback redundancyCallback; private ReadCallback readCallback; - private ReadBatchCallback readBatchCallback; // Sync and misc private Semaphore jobSemaphore; private Vector thrown; private Random random; + private BulletinBoardMessageGenerator generator; + + private BulletinBoardDigest digest; // Constructor @@ -62,10 +68,10 @@ public class GenericBulletinBoardClientTester { this.bulletinBoardClient = bulletinBoardClient; - signers = new GenericBatchDigitalSignature[2]; + signers = new GenericBulletinBoardSignature[2]; signerIDs = new ByteString[signers.length]; - signers[0] = new GenericBatchDigitalSignature(new ECDSASignature()); - signers[1] = new GenericBatchDigitalSignature(new ECDSASignature()); + signers[0] = new GenericBulletinBoardSignature(new ECDSASignature()); + signers[1] = new GenericBulletinBoardSignature(new ECDSASignature()); InputStream keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE); char[] password = KEYFILE_PASSWORD1.toCharArray(); @@ -107,6 +113,10 @@ public class GenericBulletinBoardClientTester { fail("Couldn't find signing key " + e.getMessage()); } + this.random = new Random(0); + this.generator = new BulletinBoardMessageGenerator(random); + this.digest = new GenericBulletinBoardDigest(new SHA256Digest()); + } // Callback definitions @@ -137,16 +147,21 @@ public class GenericBulletinBoardClientTester { @Override public void onSuccess(Boolean msg) { + System.err.println("Post operation completed"); - jobSemaphore.release(); - //TODO: Change Assert mechanism to exception one + if (isAssert) { - if (assertValue) { - assertThat("Post operation failed", msg, is(Boolean.TRUE)); + if (assertValue && !msg) { + genericHandleFailure(new AssertionError("Post operation failed")); + } else if (!assertValue && msg){ + genericHandleFailure(new AssertionError("Post operation succeeded unexpectedly")); } else { - assertThat("Post operation succeeded unexpectedly", msg, is(Boolean.FALSE)); + jobSemaphore.release(); } + } else { + jobSemaphore.release(); } + } @Override @@ -209,21 +224,24 @@ public class GenericBulletinBoardClientTester { } } - private class ReadBatchCallback implements FutureCallback { + private class ReadBatchCallback implements FutureCallback{ - private CompleteBatch expectedBatch; + private BulletinBoardMessage expectedMsg; - public ReadBatchCallback(CompleteBatch expectedBatch) { - this.expectedBatch = expectedBatch; + public ReadBatchCallback(BulletinBoardMessage expectedMsg) { + this.expectedMsg = expectedMsg; } @Override - public void onSuccess(CompleteBatch batch) { + public void onSuccess(BulletinBoardMessage msg) { - System.err.println(batch); - jobSemaphore.release(); + BulletinBoardMessageComparator msgComparator = new BulletinBoardMessageComparator(); - assertThat("Batch returned is incorrect", batch, is(equalTo(expectedBatch))); + if (msgComparator.compare(msg, expectedMsg) != 0) { + genericHandleFailure(new AssertionError("Batch read returned different message.\nExpected:" + expectedMsg + "\nRecieved:" + msg + "\n")); + } else { + jobSemaphore.release(); + } } @@ -233,59 +251,6 @@ public class GenericBulletinBoardClientTester { } } - // Randomness generators - - private byte randomByte(){ - return (byte) random.nextInt(); - } - - private byte[] randomByteArray(int length) { - - byte[] randomBytes = new byte[length]; - - for (int i = 0; i < length ; i++){ - randomBytes[i] = randomByte(); - } - - return randomBytes; - - } - - private CompleteBatch createRandomBatch(int signer, int batchId, int length) throws SignatureException { - - CompleteBatch completeBatch = new CompleteBatch(); - - // Create data - - completeBatch.setBeginBatchMessage(BeginBatchMessage.newBuilder() - .setSignerId(signerIDs[signer]) - .setBatchId(batchId) - .addTag("Test") - .build()); - - for (int i = 0 ; i < length ; i++){ - - BatchChunk batchChunk = BatchChunk.newBuilder() - .setData(ByteString.copyFrom(randomByteArray(i))) - .build(); - - completeBatch.appendBatchData(batchChunk); - - } - - completeBatch.setTimestamp(Timestamp.newBuilder() - .setSeconds(Math.abs(90)) - .setNanos(50) - .build()); - - signers[signer].updateContent(completeBatch); - - completeBatch.setSignature(signers[signer].sign()); - - return completeBatch; - - } - // Test methods /** @@ -310,7 +275,13 @@ public class GenericBulletinBoardClientTester { public void close() { if (thrown.size() > 0) { + + for (Throwable t : thrown){ + System.err.println(t.getMessage()); + } + assert false; + } } @@ -394,59 +365,54 @@ public class GenericBulletinBoardClientTester { /** * Tests posting a batch by parts - * Also tests not being able to post to a closed batch * @throws CommunicationException, SignatureException, InterruptedException */ public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException { - final int SIGNER = 1; - final int BATCH_ID = 100; final int BATCH_LENGTH = 100; + final int CHUNK_SIZE = 10; + final int TAG_NUM = 10; - CompleteBatch completeBatch = createRandomBatch(SIGNER, BATCH_ID, BATCH_LENGTH); + final BulletinBoardMessage msg = generator.generateRandomMessage(signers, BATCH_LENGTH, TAG_NUM); // Begin batch - bulletinBoardClient.beginBatch(completeBatch.getBeginBatchMessage(), postCallback); + bulletinBoardClient.beginBatch(msg.getMsg().getTagList(), new FutureCallback() { + + @Override + public void onSuccess(final BatchIdentifier identifier) { + + bulletinBoardClient.postBatchData(identifier, BulletinBoardUtils.breakToBatch(msg, CHUNK_SIZE), new FutureCallback() { + + @Override + public void onSuccess(Boolean result) { + + bulletinBoardClient.closeBatch(identifier, msg.getMsg().getTimestamp(), msg.getSigList(), postCallback); + + } + + @Override + public void onFailure(Throwable t) { + genericHandleFailure(t); + } + + }); + + } + + @Override + public void onFailure(Throwable t) { + genericHandleFailure(t); + } + + }); jobSemaphore.acquire(); - // Post data + digest.reset(); + digest.update(msg); - bulletinBoardClient.postBatchData(signerIDs[SIGNER], BATCH_ID, completeBatch.getBatchDataList(), postCallback); - - jobSemaphore.acquire(); - - // Close batch - - CloseBatchMessage closeBatchMessage = completeBatch.getCloseBatchMessage(); - - bulletinBoardClient.closeBatch(closeBatchMessage, postCallback); - - jobSemaphore.acquire(); - - // Attempt to open batch again - - bulletinBoardClient.beginBatch(completeBatch.getBeginBatchMessage(), failPostCallback); - - // Attempt to add batch data - - bulletinBoardClient.postBatchData(signerIDs[SIGNER], BATCH_ID, completeBatch.getBatchDataList(), failPostCallback); - - jobSemaphore.acquire(2); - - // Read batch data - - BatchSpecificationMessage batchSpecificationMessage = - BatchSpecificationMessage.newBuilder() - .setSignerId(signerIDs[SIGNER]) - .setBatchId(BATCH_ID) - .setStartPosition(0) - .build(); - - readBatchCallback = new ReadBatchCallback(completeBatch); - - bulletinBoardClient.readBatch(batchSpecificationMessage, readBatchCallback); + bulletinBoardClient.readMessage(digest.digestAsMessageID(), new ReadBatchCallback(msg)); jobSemaphore.acquire(); @@ -454,62 +420,56 @@ public class GenericBulletinBoardClientTester { /** * Posts a complete batch message - * Checks reading of the message + * Checks reading of the message in two parts * @throws CommunicationException, SignatureException, InterruptedException */ public void testCompleteBatchPost() throws CommunicationException, SignatureException, InterruptedException { - final int SIGNER = 0; - final int BATCH_ID = 101; - final int BATCH_LENGTH = 50; + final int BATCH_LENGTH = 100; + final int CHUNK_SIZE = 99; + final int TAG_NUM = 8; + + final BulletinBoardMessage msg = generator.generateRandomMessage(signers, BATCH_LENGTH, TAG_NUM); // Post batch - CompleteBatch completeBatch = createRandomBatch(SIGNER, BATCH_ID, BATCH_LENGTH); - - bulletinBoardClient.postBatch(completeBatch,postCallback); + MessageID msgID = bulletinBoardClient.postAsBatch(msg, CHUNK_SIZE, postCallback); jobSemaphore.acquire(); // Read batch - BatchSpecificationMessage batchSpecificationMessage = - BatchSpecificationMessage.newBuilder() - .setSignerId(signerIDs[SIGNER]) - .setBatchId(BATCH_ID) - .setStartPosition(0) - .build(); + MessageFilterList filterList = MessageFilterList.newBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.MSG_ID) + .setId(msgID.getID()) + .build()) + .build(); - readBatchCallback = new ReadBatchCallback(completeBatch); + bulletinBoardClient.readMessages(filterList, new FutureCallback>() { - bulletinBoardClient.readBatch(batchSpecificationMessage, readBatchCallback); + @Override + public void onSuccess(List msgList) { - jobSemaphore.acquire(); + if (msgList.size() != 1) { - } + genericHandleFailure(new AssertionError("Wrong number of stubs returned. Expected: 1; Found: " + msgList.size())); - /** - * Tests that an unopened batch cannot be closed - * @throws CommunicationException, InterruptedException - */ - public void testInvalidBatchClose() throws CommunicationException, InterruptedException { + } else { - final int NON_EXISTENT_BATCH_ID = 999; + BulletinBoardMessage retrievedMsg = msgList.get(0); + bulletinBoardClient.readBatchData(retrievedMsg, new ReadBatchCallback(msg)); - CloseBatchMessage closeBatchMessage = - CloseBatchMessage.newBuilder() - .setBatchId(NON_EXISTENT_BATCH_ID) - .setBatchLength(1) - .setSig(Crypto.Signature.getDefaultInstance()) - .setTimestamp(Timestamp.newBuilder() - .setSeconds(9) - .setNanos(12) - .build()) - .build(); + } - // Try to stop the (unopened) batch; + } - bulletinBoardClient.closeBatch(closeBatchMessage, failPostCallback); + @Override + public void onFailure(Throwable t) { + genericHandleFailure(t); + } + + }); jobSemaphore.acquire(); diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java index 52da797..ea74885 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java @@ -23,7 +23,7 @@ import static org.junit.Assert.fail; */ public class GenericSubscriptionClientTester { - private GenericBatchDigitalSignature signers[]; + private BulletinBoardSignature signers[]; private ByteString[] signerIDs; private static String KEYFILE_EXAMPLE = "/certs/enduser-certs/user1-key-with-password-secret.p12"; @@ -47,10 +47,10 @@ public class GenericSubscriptionClientTester { this.bulletinBoardClient = bulletinBoardClient; - signers = new GenericBatchDigitalSignature[2]; + signers = new BulletinBoardSignature[2]; signerIDs = new ByteString[signers.length]; - signers[0] = new GenericBatchDigitalSignature(new ECDSASignature()); - signers[1] = new GenericBatchDigitalSignature(new ECDSASignature()); + signers[0] = new GenericBulletinBoardSignature(new ECDSASignature()); + signers[1] = new GenericBulletinBoardSignature(new ECDSASignature()); InputStream keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE); char[] password = KEYFILE_PASSWORD1.toCharArray(); diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java index 0ab5c67..1804c77 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java @@ -100,13 +100,6 @@ public class LocalBulletinBoardClientTest { } - @Test - public void testInvalidBatchClose() throws CommunicationException, InterruptedException { - - clientTest.testInvalidBatchClose(); - - } - @Test public void testSubscription() throws SignatureException, CommunicationException { subscriptionTester.init(); diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/ThreadedBulletinBoardClientIntegrationTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/ThreadedBulletinBoardClientIntegrationTest.java index b69cf72..1a95e95 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/ThreadedBulletinBoardClientIntegrationTest.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/ThreadedBulletinBoardClientIntegrationTest.java @@ -85,11 +85,4 @@ public class ThreadedBulletinBoardClientIntegrationTest { } - @Test - public void testInvalidBatchClose() throws CommunicationException, InterruptedException { - - clientTest.testInvalidBatchClose(); - - } - } diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java index dcea3e7..e9c910a 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java @@ -19,6 +19,7 @@ import meerkat.crypto.DigitalSignature; import meerkat.crypto.concrete.SHA256Digest; import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.protobuf.Comm; import meerkat.protobuf.Crypto.Signature; import meerkat.protobuf.Crypto.SignatureVerificationKey; @@ -546,9 +547,17 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{ } + private void checkConnection() throws CommunicationException { + if (jdbcTemplate == null) { + throw new CommunicationException("DB connection not initialized"); + } + } + @Override public BoolValue postMessage(BulletinBoardMessage msg) throws CommunicationException { + checkConnection(); + // Perform a post, calculate the message ID and check the signature for authenticity if (postMessage(msg, null) != -1){ return BoolValue.newBuilder().setValue(true).build(); // Message was posted @@ -561,6 +570,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{ @Override public BoolValue deleteMessage(MessageID msgID) throws CommunicationException { + checkConnection(); + String sql = sqlQueryProvider.getSQLString(QueryType.DELETE_MSG_BY_ID); Map namedParameters = new HashMap(); @@ -577,6 +588,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{ @Override public BoolValue deleteMessage(long entryNum) throws CommunicationException { + checkConnection(); + String sql = sqlQueryProvider.getSQLString(QueryType.DELETE_MSG_BY_ENTRY); Map namedParameters = new HashMap(); @@ -702,6 +715,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{ @Override public void readMessages(MessageFilterList filterList, MessageOutputStream out) throws CommunicationException { + checkConnection(); + BulletinBoardMessageList.Builder resultListBuilder = BulletinBoardMessageList.newBuilder(); // SQL length is roughly 50 characters per filter + 50 for the query itself @@ -725,6 +740,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{ @Override public Int32Value getMessageCount(MessageFilterList filterList) throws CommunicationException { + checkConnection(); + BulletinBoardMessageList.Builder resultListBuilder = BulletinBoardMessageList.newBuilder(); // SQL length is roughly 50 characters per filter + 50 for the query itself @@ -793,6 +810,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{ @Override public Int64Value beginBatch(BeginBatchMessage message) throws CommunicationException { + checkConnection(); + // Store tags String sql = sqlQueryProvider.getSQLString(QueryType.STORE_BATCH_TAGS); MapSqlParameterSource namedParameters = new MapSqlParameterSource(); @@ -814,6 +833,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{ @Override public BoolValue postBatchMessage(BatchMessage batchMessage) throws CommunicationException{ + checkConnection(); + // Make sure batch is open if (!isBatchOpen(batchMessage.getBatchId())) { return BoolValue.newBuilder().setValue(false).build(); @@ -837,6 +858,7 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{ @Override public BoolValue closeBatch(CloseBatchMessage message) throws CommunicationException { + checkConnection(); // Check batch size @@ -916,6 +938,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{ @Override public void readBatch(BatchQuery batchQuery, MessageOutputStream out) throws CommunicationException, IllegalArgumentException{ + checkConnection(); + // Check that batch is closed if (!isBatchClosed(batchQuery.getMsgID())) { throw new IllegalArgumentException("No such batch"); @@ -950,7 +974,9 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{ } @Override - public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) { + public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException{ + + checkConnection(); if (generateSyncQueryParams == null || !generateSyncQueryParams.hasFilterList() @@ -1029,6 +1055,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{ @Override public SyncQueryResponse querySync(SyncQuery syncQuery) throws CommunicationException { + checkConnection(); + if (syncQuery == null){ return SyncQueryResponse.newBuilder() .setLastEntryNum(-1) diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java index 42af8ef..ab71a76 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java @@ -87,19 +87,20 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient { * Read all messages posted matching the given filter in an asynchronous manner * Note that if messages haven't been "fully posted", this might return a different * set of messages in different calls. However, messages that are fully posted - * are guaranteed to be included. + * are guaranteed to be included * Also: batch messages are returned as stubs. - * @param filterList return only messages that match the filters (null means no filtering). + * @param filterList return only messages that match the filters (null means no filtering) * @param callback is a callback function class for handling results of the operation */ public void readMessages(MessageFilterList filterList, FutureCallback> callback); /** - * Read a given batch message from the bulletin board - * @param msgID is the batch message ID to be read + * Read a given message from the bulletin board + * If the message is a batch: returns a complete message containing the batch data as well as the metadata + * @param msgID is the ID of the message to be read * @param callback is a callback class for handling the result of the operation */ - public void readBatch(MessageID msgID, FutureCallback callback); + public void readMessage(MessageID msgID, FutureCallback callback); /** * Read batch data for a specific stub message diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java index 142bb35..7f29608 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java @@ -55,12 +55,13 @@ public interface BulletinBoardClient { MessageID postAsBatch(BulletinBoardMessage msg, int chunkSize) throws CommunicationException; /** - * Read a given batch message from the bulletin board - * @param msgID is the batch message ID to be read - * @return the complete batch + * Read a given message from the bulletin board + * If the message is a batch: returns a complete message containing the batch data as well as the metadata + * @param msgID is the ID of the message to be read + * @return the complete message * @throws CommunicationException if operation is unsuccessful */ - BulletinBoardMessage readBatch(MessageID msgID) throws CommunicationException; + BulletinBoardMessage readMessage(MessageID msgID) throws CommunicationException; /** * Read batch data for a specific stub message