From a7699086d847ba5af3036ebff62ea68be06e9aa6 Mon Sep 17 00:00:00 2001 From: Arbel Deutsch Peled Date: Mon, 21 Mar 2016 20:32:57 +0200 Subject: [PATCH] Local Client for testing (without subscription yet) Partial implementation of subscriptions. Some bug fixes. --- .../SimpleBulletinBoardClient.java | 35 ++ .../SingleServerBulletinBoardClient.java | 51 +- .../ThreadedBulletinBoardClient.java | 11 +- .../SingleServerGetRedundancyWorker.java | 24 +- ...dedBulletinBoardClientIntegrationTest.java | 556 ------------------ .../LocalBulletinBoardClientTest.java | 107 ++++ .../sqlserver/BulletinBoardSQLServer.java | 96 ++- .../mappers/MessageCallbackHandler.java | 6 +- .../mappers/MessageStubCallbackHandler.java | 59 ++ .../webapp/BulletinBoardWebApp.java | 17 +- .../AsyncBulletinBoardClient.java | 13 +- .../bulletinboard/BulletinBoardClient.java | 15 +- .../bulletinboard/BulletinBoardConstants.java | 1 + .../bulletinboard/BulletinBoardServer.java | 11 +- .../BulletinBoardSubscriber.java | 32 + .../BulletinBoardSynchronizer.java | 22 + .../meerkat/bulletinboard/CompleteBatch.java | 5 + .../SubscriptionAsyncBulletinBoardClient.java | 7 + .../main/proto/meerkat/BulletinBoardAPI.proto | 12 + 19 files changed, 474 insertions(+), 606 deletions(-) delete mode 100644 bulletin-board-client/src/test/java/ThreadedBulletinBoardClientIntegrationTest.java create mode 100644 bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java create mode 100644 bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/mappers/MessageStubCallbackHandler.java create mode 100644 meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSubscriber.java create mode 100644 meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSynchronizer.java create mode 100644 meerkat-common/src/main/java/meerkat/bulletinboard/SubscriptionAsyncBulletinBoardClient.java diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java index 633e495..2ed113a 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java @@ -1,13 +1,21 @@ package meerkat.bulletinboard; import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; import meerkat.comm.CommunicationException; +import meerkat.comm.MessageInputStream; import meerkat.crypto.Digest; import meerkat.crypto.concrete.SHA256Digest; +import meerkat.protobuf.BulletinBoardAPI; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Voting.*; import meerkat.rest.*; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.util.Collection; +import java.util.Iterator; import java.util.List; import javax.ws.rs.client.Client; @@ -129,6 +137,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{ */ @Override public List readMessages(MessageFilterList filterList) { + WebTarget webTarget; Response response; BulletinBoardMessageList messageList; @@ -139,6 +148,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{ } for (String db : meerkatDBs) { + try { webTarget = client.target(db).path(BULLETIN_BOARD_SERVER_PATH).path(READ_MESSAGES_PATH); @@ -151,9 +161,34 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{ } } catch (Exception e) {} + } return null; + + } + + @Override + public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException { + + WebTarget webTarget; + Response response; + + for (String db : meerkatDBs) { + + try { + webTarget = client.target(db).path(BULLETIN_BOARD_SERVER_PATH).path(GENERATE_SYNC_QUERY_PATH); + + response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(generateSyncQueryParams, Constants.MEDIATYPE_PROTOBUF)); + + return response.readEntity(SyncQuery.class); + + } catch (Exception e) {} + + } + + throw new CommunicationException("Could not contact any server"); + } public void close() { diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java index 24cd6bf..34531cf 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java @@ -7,6 +7,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import meerkat.bulletinboard.workers.singleserver.*; import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Voting.BulletinBoardClientParams; import meerkat.util.BulletinBoardUtils; @@ -29,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; * If the list of servers contains more than one server: the server actually used is the first one * The class further implements a delayed access to the server after a communication error occurs */ -public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient implements AsyncBulletinBoardClient { +public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient implements SubscriptionAsyncBulletinBoardClient { private final int MAX_RETRIES = 11; @@ -275,13 +276,13 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i class SubscriptionCallback implements FutureCallback> { private SingleServerReadMessagesWorker worker; - private final MessageHandler messageHandler; + private final FutureCallback> callback; private MessageFilterList.Builder filterBuilder; - public SubscriptionCallback(SingleServerReadMessagesWorker worker, MessageHandler messageHandler) { + public SubscriptionCallback(SingleServerReadMessagesWorker worker, FutureCallback> callback) { this.worker = worker; - this.messageHandler = messageHandler; + this.callback = callback; filterBuilder = worker.getPayload().toBuilder(); } @@ -290,7 +291,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i public void onSuccess(List result) { // Report new messages to user - messageHandler.handleNewMessages(result); + callback.onSuccess(result); // Remove last filter from list (MIN_ENTRY one) filterBuilder.removeFilter(filterBuilder.getFilterCount() - 1); @@ -315,14 +316,16 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i // Notify client about failure fail(); - // Reschedule exact same task - scheduleWorker(worker, this); + // Notify caller about failure and terminate subscription + callback.onFailure(t); } } - public SingleServerBulletinBoardClient(int threadPoolSize, long failDelayInMilliseconds, long subscriptionIntervalInMilliseconds) { + public SingleServerBulletinBoardClient(ListeningScheduledExecutorService executorService, + long failDelayInMilliseconds, + long subscriptionIntervalInMilliseconds) { - executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadPoolSize)); + this.executorService = executorService; this.failDelayInMilliseconds = failDelayInMilliseconds; this.subscriptionIntervalInMilliseconds = subscriptionIntervalInMilliseconds; @@ -332,6 +335,14 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } + public SingleServerBulletinBoardClient(int threadPoolSize, long failDelayInMilliseconds, long subscriptionIntervalInMilliseconds) { + + this(MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadPoolSize)), + failDelayInMilliseconds, + subscriptionIntervalInMilliseconds); + + } + /** * Stores database location, initializes the web Client and * @param clientParams contains the data needed to access the DBs @@ -567,8 +578,16 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } @Override - public void subscribe(MessageFilterList filterList, MessageHandler messageHandler) { + public void querySync(SyncQuery syncQuery, FutureCallback callback) { + SingleServerQuerySyncWorker worker = new SingleServerQuerySyncWorker(meerkatDBs.get(0), syncQuery, MAX_RETRIES); + + scheduleWorker(worker, new RetryCallback<>(worker, callback)); + + } + + @Override + public void subscribe(MessageFilterList filterList, long startEntry, FutureCallback> callback) { // Remove all existing MIN_ENTRY filters and create new one that starts at 0 MessageFilterList.Builder filterListBuilder = filterList.toBuilder(); @@ -583,15 +602,19 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } filterListBuilder.addFilter(MessageFilter.newBuilder() .setType(FilterType.MIN_ENTRY) - .setEntry(0) + .setEntry(startEntry) .build()); // Create job with no retries - SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterListBuilder.build(), 1); + SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterListBuilder.build(), MAX_RETRIES); - // Submit job and create callback - scheduleWorker(worker, new SubscriptionCallback(worker, messageHandler)); + // Submit job and create callback that retries on failure and handles repeated subscription + scheduleWorker(worker, new RetryCallback<>(worker, new SubscriptionCallback(worker, callback))); + } + @Override + public void subscribe(MessageFilterList filterList, FutureCallback> callback) { + subscribe(filterList, 0, callback); } @Override diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java index d8f66f2..52e28e0 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java @@ -5,6 +5,7 @@ import com.google.protobuf.ByteString; import meerkat.bulletinboard.workers.multiserver.*; import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Voting.*; @@ -55,7 +56,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple batchDigest = new GenericBatchDigest(digest); - minAbsoluteRedundancy = (int) (clientParams.getMinRedundancy() * clientParams.getBulletinBoardAddressCount()); + minAbsoluteRedundancy = (int) (clientParams.getMinRedundancy() * (float) clientParams.getBulletinBoardAddressCount()); executorService = Executors.newFixedThreadPool(JOBS_THREAD_NUM); @@ -223,9 +224,13 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple } + /** + * This method is not supported by this class! + * This is because it has no meaning when considering more than one server without knowing which server will be contacted + */ @Override - public void subscribe(MessageFilterList filterList, MessageHandler messageHandler) { - // TODO: Implement + public void querySync(SyncQuery syncQuery, FutureCallback callback) { + callback.onFailure(new IllegalAccessError("querySync is not supported by this class")); } @Override diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGetRedundancyWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGetRedundancyWorker.java index 10517f7..0401a76 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGetRedundancyWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGetRedundancyWorker.java @@ -2,6 +2,7 @@ package meerkat.bulletinboard.workers.singleserver; import meerkat.bulletinboard.SingleServerWorker; import meerkat.comm.CommunicationException; +import meerkat.comm.MessageInputStream; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.rest.Constants; @@ -11,6 +12,10 @@ import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; + import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH; import static meerkat.bulletinboard.BulletinBoardConstants.READ_MESSAGES_PATH; @@ -45,17 +50,19 @@ public class SingleServerGetRedundancyWorker extends SingleServerWorker inputStream = null; // Retrieve answer try { - // If a BulletinBoardMessageList is returned: the read was successful - BulletinBoardMessageList msgList = response.readEntity(BulletinBoardMessageList.class); + inputStream = MessageInputStream.MessageInputStreamFactory.createMessageInputStream(in, BulletinBoardMessage.class); - if (msgList.getMessageList().size() > 0){ + if (inputStream.asList().size() > 0){ // Message exists in the server return 1.0f; } @@ -64,14 +71,15 @@ public class SingleServerGetRedundancyWorker extends SingleServerWorker thrown; - private Random random; - - // Constructor - - public ThreadedBulletinBoardClientIntegrationTest(){ - - signers = new GenericBatchDigitalSignature[2]; - signerIDs = new ByteString[signers.length]; - signers[0] = new GenericBatchDigitalSignature(new ECDSASignature()); - signers[1] = new GenericBatchDigitalSignature(new ECDSASignature()); - - InputStream keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE); - char[] password = KEYFILE_PASSWORD1.toCharArray(); - - KeyStore.Builder keyStoreBuilder; - try { - keyStoreBuilder = signers[0].getPKCS12KeyStoreBuilder(keyStream, password); - - signers[0].loadSigningCertificate(keyStoreBuilder); - - signers[0].loadVerificationCertificates(getClass().getResourceAsStream(CERT1_PEM_EXAMPLE)); - - keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE3); - password = KEYFILE_PASSWORD3.toCharArray(); - - keyStoreBuilder = signers[1].getPKCS12KeyStoreBuilder(keyStream, password); - signers[1].loadSigningCertificate(keyStoreBuilder); - - signers[1].loadVerificationCertificates(getClass().getResourceAsStream(CERT3_PEM_EXAMPLE)); - - for (int i = 0 ; i < signers.length ; i++) { - signerIDs[i] = signers[i].getSignerID(); - } - - } catch (IOException e) { - System.err.println("Failed reading from signature file " + e.getMessage()); - fail("Failed reading from signature file " + e.getMessage()); - } catch (CertificateException e) { - System.err.println("Failed reading certificate " + e.getMessage()); - fail("Failed reading certificate " + e.getMessage()); - } catch (KeyStoreException e) { - System.err.println("Failed reading keystore " + e.getMessage()); - fail("Failed reading keystore " + e.getMessage()); - } catch (NoSuchAlgorithmException e) { - System.err.println("Couldn't find signing algorithm " + e.getMessage()); - fail("Couldn't find signing algorithm " + e.getMessage()); - } catch (UnrecoverableKeyException e) { - System.err.println("Couldn't find signing key " + e.getMessage()); - fail("Couldn't find signing key " + e.getMessage()); - } - - } - - // Callback definitions - - protected void genericHandleFailure(Throwable t){ - System.err.println(t.getCause() + " " + t.getMessage()); - thrown.add(t); - jobSemaphore.release(); - } - - private class PostCallback implements FutureCallback{ - - private boolean isAssert; - private boolean assertValue; - - public PostCallback() { - this(false); - } - - public PostCallback(boolean isAssert) { - this(isAssert,true); - } - - public PostCallback(boolean isAssert, boolean assertValue) { - this.isAssert = isAssert; - this.assertValue = assertValue; - } - - @Override - public void onSuccess(Boolean msg) { - System.err.println("Post operation completed"); - jobSemaphore.release(); - //TODO: Change Assert mechanism to exception one - if (isAssert) { - if (assertValue) { - assertThat("Post operation failed", msg, is(Boolean.TRUE)); - } else { - assertThat("Post operation succeeded unexpectedly", msg, is(Boolean.FALSE)); - } - } - } - - @Override - public void onFailure(Throwable t) { - genericHandleFailure(t); - } - } - - private class RedundancyCallback implements FutureCallback{ - - private float minRedundancy; - - public RedundancyCallback(float minRedundancy) { - this.minRedundancy = minRedundancy; - } - - @Override - public void onSuccess(Float redundancy) { - System.err.println("Redundancy found is: " + redundancy); - jobSemaphore.release(); - assertThat(redundancy, greaterThanOrEqualTo(minRedundancy)); - } - - @Override - public void onFailure(Throwable t) { - genericHandleFailure(t); - } - } - - private class ReadCallback implements FutureCallback>{ - - private List expectedMsgList; - - public ReadCallback(List expectedMsgList) { - this.expectedMsgList = expectedMsgList; - } - - @Override - public void onSuccess(List messages) { - - System.err.println(messages); - jobSemaphore.release(); - - BulletinBoardMessageComparator msgComparator = new BulletinBoardMessageComparator(); - - assertThat(messages.size(), is(expectedMsgList.size())); - - Iterator expectedMessageIterator = expectedMsgList.iterator(); - Iterator receivedMessageIterator = messages.iterator(); - - while (expectedMessageIterator.hasNext()) { - assertThat(msgComparator.compare(expectedMessageIterator.next(), receivedMessageIterator.next()), is(0)); - } - - } - - @Override - public void onFailure(Throwable t) { - genericHandleFailure(t); - } - } - - private class ReadBatchCallback implements FutureCallback { - - private CompleteBatch expectedBatch; - - public ReadBatchCallback(CompleteBatch expectedBatch) { - this.expectedBatch = expectedBatch; - } - - @Override - public void onSuccess(CompleteBatch batch) { - - System.err.println(batch); - jobSemaphore.release(); - - assertThat("Batch returned is incorrect", batch, is(equalTo(expectedBatch))); - - } - - @Override - public void onFailure(Throwable t) { - genericHandleFailure(t); - } - } - - // Randomness generators - - private byte randomByte(){ - return (byte) random.nextInt(); - } - - private byte[] randomByteArray(int length) { - - byte[] randomBytes = new byte[length]; - - for (int i = 0; i < length ; i++){ - randomBytes[i] = randomByte(); - } - - return randomBytes; - - } - - private CompleteBatch createRandomBatch(int signer, int batchId, int length) throws SignatureException { - - CompleteBatch completeBatch = new CompleteBatch(); - - // Create data - - completeBatch.setBeginBatchMessage(BeginBatchMessage.newBuilder() - .setSignerId(signerIDs[signer]) - .setBatchId(batchId) - .addTag("Test") - .build()); - - for (int i = 0 ; i < length ; i++){ - - BatchData batchData = BatchData.newBuilder() - .setData(ByteString.copyFrom(randomByteArray(i))) - .build(); - - completeBatch.appendBatchData(batchData); - - } - - completeBatch.setTimestamp(Timestamp.newBuilder() - .setSeconds(Math.abs(90)) - .setNanos(50) - .build()); - - signers[signer].updateContent(completeBatch); - - completeBatch.setSignature(signers[signer].sign()); - - return completeBatch; - - } - - // Test methods - - /** - * Takes care of initializing the client and the test resources - */ - @Before - public void init(){ - - bulletinBoardClient = new ThreadedBulletinBoardClient(); - - random = new Random(0); // We use insecure randomness in tests for repeatability - - List testDB = new LinkedList<>(); - testDB.add(BASE_URL); - - bulletinBoardClient.init(BulletinBoardClientParams.newBuilder() - .addAllBulletinBoardAddress(testDB) - .setMinRedundancy((float) 1.0) - .build()); - - postCallback = new PostCallback(); - redundancyCallback = new RedundancyCallback((float) 1.0); - - thrown = new Vector<>(); - jobSemaphore = new Semaphore(0); - - } - - /** - * Closes the client and makes sure the test fails when an exception occurred in a separate thread - */ - - @After - public void close() { - - bulletinBoardClient.close(); - - if (thrown.size() > 0) { - assert false; - } - - } - - /** - * Tests the standard post, redundancy and read methods - */ - @Test - public void postTest() { - - byte[] b1 = {(byte) 1, (byte) 2, (byte) 3, (byte) 4}; - byte[] b2 = {(byte) 11, (byte) 12, (byte) 13, (byte) 14}; - byte[] b3 = {(byte) 21, (byte) 22, (byte) 23, (byte) 24}; - - BulletinBoardMessage msg; - - MessageFilterList filterList; - List msgList; - - MessageID messageID; - - msg = BulletinBoardMessage.newBuilder() - .setMsg(UnsignedBulletinBoardMessage.newBuilder() - .addTag("Signature") - .addTag("Trustee") - .setData(ByteString.copyFrom(b1)) - .setTimestamp(Timestamp.newBuilder() - .setSeconds(20) - .setNanos(30) - .build()) - .build()) - .addSig(Crypto.Signature.newBuilder() - .setType(Crypto.SignatureType.DSA) - .setData(ByteString.copyFrom(b2)) - .setSignerId(ByteString.copyFrom(b3)) - .build()) - .addSig(Crypto.Signature.newBuilder() - .setType(Crypto.SignatureType.ECDSA) - .setData(ByteString.copyFrom(b3)) - .setSignerId(ByteString.copyFrom(b2)) - .build()) - .build(); - - messageID = bulletinBoardClient.postMessage(msg,postCallback); - - try { - jobSemaphore.acquire(); - } catch (InterruptedException e) { - System.err.println(e.getCause() + " " + e.getMessage()); - } - - bulletinBoardClient.getRedundancy(messageID,redundancyCallback); - - filterList = MessageFilterList.newBuilder() - .addFilter( - MessageFilter.newBuilder() - .setType(FilterType.TAG) - .setTag("Signature") - .build() - ) - .addFilter( - MessageFilter.newBuilder() - .setType(FilterType.TAG) - .setTag("Trustee") - .build() - ) - .build(); - - msgList = new LinkedList<>(); - msgList.add(msg); - - readCallback = new ReadCallback(msgList); - - bulletinBoardClient.readMessages(filterList, readCallback); - try { - jobSemaphore.acquire(2); - } catch (InterruptedException e) { - System.err.println(e.getCause() + " " + e.getMessage()); - } - - } - - /** - * Tests posting a batch by parts - * Also tests not being able to post to a closed batch - * @throws CommunicationException, SignatureException, InterruptedException - */ - @Test - public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException { - - final int SIGNER = 1; - final int BATCH_ID = 100; - final int BATCH_LENGTH = 100; - - CompleteBatch completeBatch = createRandomBatch(SIGNER, BATCH_ID, BATCH_LENGTH); - - // Begin batch - - bulletinBoardClient.beginBatch(completeBatch.getBeginBatchMessage(), postCallback); - - jobSemaphore.acquire(); - - // Post data - - bulletinBoardClient.postBatchData(signerIDs[SIGNER], BATCH_ID, completeBatch.getBatchDataList(), postCallback); - - jobSemaphore.acquire(); - - // Close batch - - CloseBatchMessage closeBatchMessage = CloseBatchMessage.newBuilder() - .setBatchId(BATCH_ID) - .setBatchLength(BATCH_LENGTH) - .setTimestamp(Timestamp.newBuilder() - .setSeconds(50) - .setNanos(80) - .build()) - .setSig(completeBatch.getSignature()) - .build(); - - bulletinBoardClient.closeBatch(closeBatchMessage, postCallback); - - jobSemaphore.acquire(); - - // Attempt to open batch again - - bulletinBoardClient.beginBatch(completeBatch.getBeginBatchMessage(), failPostCallback); - - // Attempt to add batch data - - bulletinBoardClient.postBatchData(signerIDs[SIGNER], BATCH_ID, completeBatch.getBatchDataList(), failPostCallback); - - jobSemaphore.acquire(2); - - // Read batch data - - BatchSpecificationMessage batchSpecificationMessage = - BatchSpecificationMessage.newBuilder() - .setSignerId(signerIDs[SIGNER]) - .setBatchId(BATCH_ID) - .setStartPosition(0) - .build(); - - readBatchCallback = new ReadBatchCallback(completeBatch); - - bulletinBoardClient.readBatch(batchSpecificationMessage, readBatchCallback); - - jobSemaphore.acquire(); - - } - - /** - * Posts a complete batch message - * Checks reading od the message - * @throws CommunicationException, SignatureException, InterruptedException - */ - @Test - public void testCompleteBatchPost() throws CommunicationException, SignatureException, InterruptedException { - - final int SIGNER = 0; - final int BATCH_ID = 101; - final int BATCH_LENGTH = 50; - - // Post batch - - CompleteBatch completeBatch = createRandomBatch(SIGNER, BATCH_ID, BATCH_LENGTH); - - bulletinBoardClient.postBatch(completeBatch,postCallback); - - jobSemaphore.acquire(); - - // Read batch - - BatchSpecificationMessage batchSpecificationMessage = - BatchSpecificationMessage.newBuilder() - .setSignerId(signerIDs[SIGNER]) - .setBatchId(BATCH_ID) - .setStartPosition(0) - .build(); - - readBatchCallback = new ReadBatchCallback(completeBatch); - - bulletinBoardClient.readBatch(batchSpecificationMessage, readBatchCallback); - - jobSemaphore.acquire(); - - } - - /** - * Tests that an unopened batch cannot be closed - * @throws CommunicationException, InterruptedException - */ - @Test - public void testInvalidBatchClose() throws CommunicationException, InterruptedException { - - final int NON_EXISTENT_BATCH_ID = 999; - - CloseBatchMessage closeBatchMessage = - CloseBatchMessage.newBuilder() - .setBatchId(NON_EXISTENT_BATCH_ID) - .setBatchLength(1) - .setSig(Crypto.Signature.getDefaultInstance()) - .setTimestamp(Timestamp.newBuilder() - .setSeconds(9) - .setNanos(12) - .build()) - .build(); - - // Try to close the (unopened) batch; - - bulletinBoardClient.closeBatch(closeBatchMessage, failPostCallback); - - jobSemaphore.acquire(); - - } - -} diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java new file mode 100644 index 0000000..2e0e0af --- /dev/null +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java @@ -0,0 +1,107 @@ +package meerkat.bulletinboard; + +import meerkat.bulletinboard.sqlserver.*; +import meerkat.comm.CommunicationException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.security.SignatureException; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +import static org.junit.Assert.fail; + +/** + * Created by Arbel Deutsch Peled on 05-Dec-15. + */ +public class LocalBulletinBoardClientTest { + + private static final int THREAD_NUM = 3; + private static final String DB_NAME = "TestDB"; + + // Tester + private GenericBulletinBoardClientTester clientTest; + + public LocalBulletinBoardClientTest() throws CommunicationException { + + H2QueryProvider queryProvider = new H2QueryProvider(DB_NAME) ; + + try { + + Connection conn = queryProvider.getDataSource().getConnection(); + Statement stmt = conn.createStatement(); + + List deletionQueries = queryProvider.getSchemaDeletionCommands(); + + for (String deletionQuery : deletionQueries) { + stmt.execute(deletionQuery); + } + + } catch (SQLException e) { + System.err.println(e.getMessage()); + throw new CommunicationException(e.getCause() + " " + e.getMessage()); + } + + BulletinBoardServer server = new BulletinBoardSQLServer(queryProvider); + server.init(DB_NAME); + + LocalBulletinBoardClient client = new LocalBulletinBoardClient(server, THREAD_NUM); + + clientTest = new GenericBulletinBoardClientTester(client); + + } + + // Test methods + + /** + * Takes care of initializing the client and the test resources + */ + @Before + public void init(){ + + clientTest.init(); + + } + + /** + * Closes the client and makes sure the test fails when an exception occurred in a separate thread + */ + + @After + public void close() { + + clientTest.close(); + + } + + @Test + public void postTest() { + + clientTest.postTest(); + + } + + @Test + public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + clientTest.testBatchPost(); + } + + @Test + public void testCompleteBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + clientTest.testCompleteBatchPost(); + + } + + @Test + public void testInvalidBatchClose() throws CommunicationException, InterruptedException { + + clientTest.testInvalidBatchClose(); + + } + +} diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java index c7bcf57..ab82ab1 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java @@ -5,6 +5,7 @@ import java.util.*; import com.google.protobuf.*; +import com.google.protobuf.Timestamp; import meerkat.bulletinboard.*; import meerkat.bulletinboard.sqlserver.mappers.*; import static meerkat.bulletinboard.BulletinBoardConstants.*; @@ -26,6 +27,7 @@ import javax.sql.DataSource; import meerkat.util.BulletinBoardUtils; import meerkat.util.TimestampComparator; +import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.jdbc.support.GeneratedKeyHolder; @@ -586,14 +588,12 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ } - /** - * Used to retrieve just basic information about messages to allow calculation of checksum + * Private implementation of the message stub reader for returning result as a list * @param filterList is a filter list that defines which messages the client is interested in - * @return a list of Bulletin Board Messages that contain just the entry number, timestamp and message ID for each message - * The message ID is returned inside the message data field + * @return the requested list of message stubs */ - protected List readMessageStubs(MessageFilterList filterList) { + private List readMessageStubs(MessageFilterList filterList) { StringBuilder sqlBuilder = new StringBuilder(50 * (filterList.getFilterCount() + 1)); @@ -635,6 +635,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ } + /** * This method returns a string representation of the tag associated with a batch ID * @param batchId is the given batch ID @@ -644,6 +645,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ return BATCH_ID_TAG_PREFIX + Integer.toString(batchId); } + /** * This method checks if a specified batch exists and is already closed * @param signerId is the ID of the publisher of the batch @@ -687,6 +689,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ } + @Override public BoolMsg beginBatch(BeginBatchMessage message) throws CommunicationException { @@ -719,8 +722,10 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ jdbcTemplate.batchUpdate(sql,namedParameters); return BoolMsg.newBuilder().setValue(true).build(); + } + @Override public BoolMsg postBatchMessage(BatchMessage batchMessage) throws CommunicationException{ @@ -744,6 +749,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ } + @Override public BoolMsg closeBatchMessage(CloseBatchMessage message) throws CommunicationException { @@ -767,13 +773,12 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ return BoolMsg.newBuilder().setValue(false).build(); } - // Get Tags and add them to CompleteBatch sql = sqlQueryProvider.getSQLString(QueryType.GET_BATCH_TAGS); namedParameters = new MapSqlParameterSource(); - namedParameters.addValue(QueryType.GET_BATCH_TAGS.getParamName(0),signerId); + namedParameters.addValue(QueryType.GET_BATCH_TAGS.getParamName(0),signerId.toByteArray()); namedParameters.addValue(QueryType.GET_BATCH_TAGS.getParamName(1),batchId); List tags = jdbcTemplate.query(sql, namedParameters, new StringMapper()); @@ -847,6 +852,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ return BoolMsg.newBuilder().setValue(true).build(); } + @Override public void readBatch(BatchSpecificationMessage message, MessageOutputStream out) throws CommunicationException, IllegalArgumentException{ @@ -866,6 +872,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ } + /** * Finds the entry number of the last entry in the database * @return the entry number, or -1 if no entries are found @@ -884,10 +891,80 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ } + @Override + public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) { + + if (generateSyncQueryParams == null + || !generateSyncQueryParams.hasFilterList() + || generateSyncQueryParams.getFilterList().getFilterCount() <= 0 + || generateSyncQueryParams.getBreakpointListCount() <= 0){ + + return SyncQuery.getDefaultInstance(); + + } + + List messages = readMessageStubs(generateSyncQueryParams.getFilterList()); + + if (messages.size() <= 0){ + return SyncQuery.newBuilder().build(); + } + + SyncQuery.Builder resultBuilder = SyncQuery.newBuilder(); + + Iterator messageIterator = messages.iterator(); + Iterator breakpointIterator = generateSyncQueryParams.getBreakpointListList().iterator(); + + Checksum checksum = new SimpleChecksum(); + checksum.setDigest(new SHA256Digest()); + + Timestamp lastTimestamp = Timestamp.getDefaultInstance(); + BulletinBoardMessage message = messageIterator.next(); + long currentMessageNum = 1; + + boolean checksumChanged = true; + + while (breakpointIterator.hasNext()){ + + Float breakpoint = breakpointIterator.next(); + + // Continue while breakpoint not reached, or it has been reached but no new timestamp has been encountered since + while ( messageIterator.hasNext() + && ((float) currentMessageNum / (float) messages.size() <= breakpoint) + || ((float) currentMessageNum / (float) messages.size() > breakpoint + && lastTimestamp.equals(message.getMsg().getTimestamp()))){ + + checksumChanged = true; + + checksum.update(message.getMsg().getData()); + + lastTimestamp = message.getMsg().getTimestamp(); + message = messageIterator.next(); + + } + + if (checksumChanged) { + + checksum.update(message.getMsg().getData()); + resultBuilder.addQuery(SingleSyncQuery.newBuilder() + .setTimeOfSync(message.getMsg().getTimestamp()) + .setChecksum(checksum.getChecksum()) + .build()); + + } + + checksumChanged = false; + + } + + return resultBuilder.build(); + + } + + /** * Searches for the latest time of sync of the DB relative to a given query and returns the metadata needed to complete the sync - * The checksum up to (and including) each given timestamp is calculated using bitwise XOR on 8-byte sized blocks of the message IDs - * @param syncQuery contains a succinct representation of states to compare to + * The checksum up to (and including) each given timestamp is calculated using an instance of SimpleChecksum + * @param syncQuery contains a succinct representation of states to compare against * @return the current last entry num and latest time of sync if there is one; -1 as last entry and empty timestamp otherwise * @throws CommunicationException */ @@ -960,6 +1037,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ } + @Override public void close() {} diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/mappers/MessageCallbackHandler.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/mappers/MessageCallbackHandler.java index bdba241..71ba742 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/mappers/MessageCallbackHandler.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/mappers/MessageCallbackHandler.java @@ -20,9 +20,9 @@ import java.util.List; */ public class MessageCallbackHandler implements RowCallbackHandler { - NamedParameterJdbcTemplate jdbcTemplate; - SQLQueryProvider sqlQueryProvider; - MessageOutputStream out; + private final NamedParameterJdbcTemplate jdbcTemplate; + private final SQLQueryProvider sqlQueryProvider; + private final MessageOutputStream out; public MessageCallbackHandler(NamedParameterJdbcTemplate jdbcTemplate, SQLQueryProvider sqlQueryProvider, MessageOutputStream out) { diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/mappers/MessageStubCallbackHandler.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/mappers/MessageStubCallbackHandler.java new file mode 100644 index 0000000..f81cc76 --- /dev/null +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/mappers/MessageStubCallbackHandler.java @@ -0,0 +1,59 @@ +package meerkat.bulletinboard.sqlserver.mappers; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import meerkat.bulletinboard.sqlserver.BulletinBoardSQLServer.SQLQueryProvider.QueryType; +import meerkat.comm.MessageOutputStream; +import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage; +import meerkat.protobuf.BulletinBoardAPI.UnsignedBulletinBoardMessage; +import meerkat.protobuf.Crypto; +import meerkat.util.BulletinBoardUtils; +import org.springframework.jdbc.core.RowCallbackHandler; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +/** + * Created by Arbel Deutsch Peled on 21-Feb-16. + */ +public class MessageStubCallbackHandler implements RowCallbackHandler { + + private final MessageOutputStream out; + + public MessageStubCallbackHandler(MessageOutputStream out) { + + this.out = out; + + } + + @Override + public void processRow(ResultSet rs) throws SQLException { + + BulletinBoardMessage result; + + result = BulletinBoardMessage.newBuilder() + .setEntryNum(rs.getLong(1)) + .setMsg(UnsignedBulletinBoardMessage.newBuilder() + .setData(ByteString.copyFrom(rs.getBytes(2))) + .setTimestamp(BulletinBoardUtils.toTimestampProto(rs.getTimestamp(3))) + .build()) + .build(); + + try { + + out.writeMessage(result); + + } catch (IOException e) { + + //TODO: log + e.printStackTrace(); + + } + + } + +} diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java index 2e4782f..7c0f7fa 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java @@ -22,7 +22,7 @@ import static meerkat.rest.Constants.*; import java.io.IOException; import java.io.OutputStream; -import java.util.List; +import java.util.Collection; /** * An implementation of the BulletinBoardServer which functions as a WebApp @@ -183,6 +183,21 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL } } + @Path(GENERATE_SYNC_QUERY_PATH) + @POST + @Consumes(MEDIATYPE_PROTOBUF) + @Produces(MEDIATYPE_PROTOBUF) + @Override + public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException { + try { + init(); + return bulletinBoard.generateSyncQuery(generateSyncQueryParams); + } catch (CommunicationException | IllegalArgumentException e) { + System.err.println(e.getMessage()); + return null; + } + } + @Path(READ_BATCH_PATH) @POST @Consumes(MEDIATYPE_PROTOBUF) diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java index c6e330c..d74c1ea 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java @@ -11,10 +11,6 @@ import java.util.List; */ public interface AsyncBulletinBoardClient extends BulletinBoardClient { - public interface MessageHandler { - void handleNewMessages(List messageList); - } - /** * Post a message to the bulletin board in an asynchronous manner * @param msg is the message to be posted @@ -100,11 +96,12 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient { */ public void readBatch(BatchSpecificationMessage batchSpecificationMessage, FutureCallback callback); + /** - * Subscribes to a notifier that will return any new messages on the server that match the given filters - * @param filterList defines the set of filters for message retrieval - * @param messageHandler defines the handler for new messages received + * Perform a Sync Query on the bulletin board + * @param syncQuery defines the query + * @param callback is a callback for handling the result of the query */ - public void subscribe(MessageFilterList filterList, MessageHandler messageHandler); + public void querySync(SyncQuery syncQuery, FutureCallback callback); } diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java index 2f5a3df..245eddf 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java @@ -5,6 +5,7 @@ import meerkat.protobuf.Voting.*; import static meerkat.protobuf.BulletinBoardAPI.*; +import java.util.Collection; import java.util.List; /** @@ -26,8 +27,6 @@ public interface BulletinBoardClient { */ MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException; - - /** * Check how "safe" a given message is in a synchronous manner * @param id is the unique message identifier for retrieval @@ -40,11 +39,21 @@ public interface BulletinBoardClient { * Note that if messages haven't been "fully posted", this might return a different * set of messages in different calls. However, messages that are fully posted * are guaranteed to be included. - * @param filterList return only messages that match the filters (null means no filtering). + * @param filterList return only messages that match the filters (null means no filtering) * @return the list of messages */ List readMessages(MessageFilterList filterList); + /** + * Create a SyncQuery to test against that corresponds with the current server state for a specific filter list + * Should only be called on instances for which the actual server contacted is known (i.e. there is only one server) + * @param GenerateSyncQueryParams defines the required information needed to generate the query + * These are represented as fractions of the total number of relevant messages + * @return The generated SyncQuery + * @throws CommunicationException when no DB can be contacted + */ + SyncQuery generateSyncQuery(GenerateSyncQueryParams GenerateSyncQueryParams) throws CommunicationException; + /** * Closes all connections, if any. * This is done in a synchronous (blocking) way. diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardConstants.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardConstants.java index 66652f8..9ddee90 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardConstants.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardConstants.java @@ -8,6 +8,7 @@ public interface BulletinBoardConstants { // Relative addresses for Bulletin Board operations public static final String BULLETIN_BOARD_SERVER_PATH = "/bbserver"; + public static final String GENERATE_SYNC_QUERY_PATH = "/generatesyncquery"; public static final String READ_MESSAGES_PATH = "/readmessages"; public static final String READ_BATCH_PATH = "/readbatch"; public static final String POST_MESSAGE_PATH = "/postmessage"; diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java index 0b279c2..e458dbc 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java @@ -4,6 +4,8 @@ import meerkat.comm.CommunicationException; import meerkat.comm.MessageOutputStream; import meerkat.protobuf.BulletinBoardAPI.*; +import java.util.Collection; + /** * Created by Arbel on 07/11/15. @@ -28,7 +30,7 @@ public interface BulletinBoardServer{ * @throws CommunicationException on DB connection error */ public BoolMsg postMessage(BulletinBoardMessage msg) throws CommunicationException; - + /** * Read all messages posted matching the given filter * @param filterList return only messages that match the filters (empty list or null means no filtering) @@ -77,6 +79,13 @@ public interface BulletinBoardServer{ */ public void readBatch(BatchSpecificationMessage message, MessageOutputStream out) throws CommunicationException, IllegalArgumentException; + /** + * Create a SyncQuery to test against that corresponds with the current server state for a specific filter list + * @param generateSyncQueryParams defines the information needed to generate the query + * @return The generated SyncQuery + * @throws CommunicationException on DB connection error + */ + SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException; /** * Queries the database for sync status with respect to a given sync query diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSubscriber.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSubscriber.java new file mode 100644 index 0000000..85eb2cc --- /dev/null +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSubscriber.java @@ -0,0 +1,32 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.FutureCallback; +import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage; +import meerkat.protobuf.BulletinBoardAPI.MessageFilterList; + +import java.util.List; + +/** + * Created by Arbel Deutsch Peled on 03-Mar-16. + * This interface defines the behaviour required from a subscription service to Bulletin Board messages + */ +public interface BulletinBoardSubscriber { + + /** + * Subscribes to a notifier that will return any new messages on the server that match the given filters + * In case of communication error: the subscription is terminated + * @param filterList defines the set of filters for message retrieval + * @param callback defines how to handle new messages received and/or a failures in communication + */ + public void subscribe(MessageFilterList filterList, FutureCallback> callback); + + /** + * Subscribes to a notifier that will return any new messages on the server that match the given filters + * In case of communication error: the subscription is terminated + * @param filterList defines the set of filters for message retrieval + * @param startEntry defines the first entry number to consider + * @param callback defines how to handle new messages received and/or a failures in communication + */ + public void subscribe(MessageFilterList filterList, long startEntry, FutureCallback> callback); + +} diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSynchronizer.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSynchronizer.java new file mode 100644 index 0000000..4b07225 --- /dev/null +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSynchronizer.java @@ -0,0 +1,22 @@ +package meerkat.bulletinboard; + +/** + * Created by Arbel Deutsch Peled on 08-Mar-16. + * This interface defines the behaviour of a bulletin board synchronizer + * This is used to make sure that data in a specific instance of a bulletin board server is duplicated to a sufficient percentage of the other servers + */ +public interface BulletinBoardSynchronizer extends Runnable{ + + /** + * + * @param localClient is a client for the local DB instance + * @param remoteClient is a client for the remote DBs + * @param minRedundancy + */ + public void init(BulletinBoardClient localClient, AsyncBulletinBoardClient remoteClient, float minRedundancy); + + @Override + public void run(); + + +} diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/CompleteBatch.java b/meerkat-common/src/main/java/meerkat/bulletinboard/CompleteBatch.java index 14e87e7..649fd8b 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/CompleteBatch.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/CompleteBatch.java @@ -139,7 +139,12 @@ public class CompleteBatch { @Override public String toString() { + + if (beginBatchMessage == null || beginBatchMessage.getSignerId() == null) + return "Unspecified batch " + super.toString(); + return "Batch " + beginBatchMessage.getSignerId().toString() + ":" + beginBatchMessage.getBatchId(); + } } diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/SubscriptionAsyncBulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/SubscriptionAsyncBulletinBoardClient.java new file mode 100644 index 0000000..b07e655 --- /dev/null +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/SubscriptionAsyncBulletinBoardClient.java @@ -0,0 +1,7 @@ +package meerkat.bulletinboard; + +/** + * Created by Arbel Deutsch Peled on 03-Mar-16. + */ +public interface SubscriptionAsyncBulletinBoardClient extends AsyncBulletinBoardClient, BulletinBoardSubscriber { +} diff --git a/meerkat-common/src/main/proto/meerkat/BulletinBoardAPI.proto b/meerkat-common/src/main/proto/meerkat/BulletinBoardAPI.proto index fd95503..1136e16 100644 --- a/meerkat-common/src/main/proto/meerkat/BulletinBoardAPI.proto +++ b/meerkat-common/src/main/proto/meerkat/BulletinBoardAPI.proto @@ -146,6 +146,18 @@ message SyncQuery { } +// This message defines the required information for generation of a SyncQuery instance by the server +message GenerateSyncQueryParams { + + // Defines the set of messages required + MessageFilterList filterList = 1; + + // Defines the locations in the list of messages to calculate single sync queries for + // The values should be between 0.0 and 1.0 and define the location in fractions of the size of the message set + repeated float breakpointList = 2; + +} + // This message defines the server's response format to a sync query message SyncQueryResponse {