diff --git a/bulletin-board-client/build.gradle b/bulletin-board-client/build.gradle index 9691016..8fdeba0 100644 --- a/bulletin-board-client/build.gradle +++ b/bulletin-board-client/build.gradle @@ -24,7 +24,7 @@ ext { nexusPassword = project.hasProperty('nexusPassword') ? project.property('nexusPassword') : "" } -description = "Meerkat Bulletin Board Client implementation" +description = "Meerkat Voting Common Library" // Your project version version = "0.0" @@ -55,6 +55,12 @@ dependencies { compile 'org.bouncycastle:bcprov-jdk15on:1.53' compile 'org.bouncycastle:bcpkix-jdk15on:1.53' + // Depend on test resources from meerkat-common + testCompile project(path: ':meerkat-common', configuration: 'testOutput') + + // Depend on server compilation for the non-integration tests + testCompile project(path: ':bulletin-board-server') + testCompile 'junit:junit:4.+' testCompile 'org.hamcrest:hamcrest-all:1.3' @@ -63,6 +69,7 @@ dependencies { test { exclude '**/*IntegrationTest*' + outputs.upToDateWhen { false } } task integrationTest(type: Test) { diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java new file mode 100644 index 0000000..96ba76d --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java @@ -0,0 +1,168 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI; +import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.protobuf.Voting.*; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Executors; + +/** + * Created by Arbel Deutsch Peled on 03-Mar-16. + * This is a full-fledged implementation of a Bulletin Board Client + * It provides asynchronous access to several remote servers, as well as a local cache + * Read/write operations are performed on the local server + * After any read is carried out, a subscription is made for the specific query to make sure the local DB will be updated + * The database also employs a synchronizer which makes sure local data is sent to the remote servers + */ +public class CachedBulletinBoardClient implements SubscriptionAsyncBulletinBoardClient { + + private final BulletinBoardClient localClient; + private AsyncBulletinBoardClient remoteClient; + private BulletinBoardSubscriber subscriber; + + private final int threadPoolSize; + private final long failDelayInMilliseconds; + private final long subscriptionIntervalInMilliseconds; + + public CachedBulletinBoardClient(BulletinBoardClient localClient, + int threadPoolSize, + long failDelayInMilliseconds, + long subscriptionIntervalInMilliseconds) + throws IllegalAccessException, InstantiationException { + + this.localClient = localClient; + this.threadPoolSize = threadPoolSize; + this.failDelayInMilliseconds = failDelayInMilliseconds; + this.subscriptionIntervalInMilliseconds = subscriptionIntervalInMilliseconds; + + remoteClient = new ThreadedBulletinBoardClient(); + + } + + @Override + public MessageID postMessage(BulletinBoardMessage msg, FutureCallback callback) { + return null; + } + + @Override + public MessageID postBatch(CompleteBatch completeBatch, FutureCallback callback) { + return null; + } + + @Override + public void beginBatch(BeginBatchMessage beginBatchMessage, FutureCallback callback) { + + } + + @Override + public void postBatchData(byte[] signerId, int batchId, List batchDataList, int startPosition, FutureCallback callback) { + + } + + @Override + public void postBatchData(byte[] signerId, int batchId, List batchDataList, FutureCallback callback) { + + } + + @Override + public void postBatchData(ByteString signerId, int batchId, List batchDataList, int startPosition, FutureCallback callback) { + + } + + @Override + public void postBatchData(ByteString signerId, int batchId, List batchDataList, FutureCallback callback) { + + } + + @Override + public void closeBatch(CloseBatchMessage closeBatchMessage, FutureCallback callback) { + + } + + @Override + public void getRedundancy(MessageID id, FutureCallback callback) { + + } + + @Override + public void readMessages(MessageFilterList filterList, FutureCallback> callback) { + + } + + @Override + public void readBatch(BatchSpecificationMessage batchSpecificationMessage, FutureCallback callback) { + + } + + @Override + public void querySync(SyncQuery syncQuery, FutureCallback callback) { + + } + + @Override + public void init(BulletinBoardClientParams clientParams) { + + remoteClient.init(clientParams); + + ListeningScheduledExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadPoolSize)); + + List subscriberClients = new ArrayList<>(clientParams.getBulletinBoardAddressCount()); + + for (String address : clientParams.getBulletinBoardAddressList()){ + + SubscriptionAsyncBulletinBoardClient newClient = + new SingleServerBulletinBoardClient(executorService, failDelayInMilliseconds, subscriptionIntervalInMilliseconds); + + newClient.init(clientParams.toBuilder().clearBulletinBoardAddress().addBulletinBoardAddress(address).build()); + + subscriberClients.add(newClient); + + } + + subscriber = new ThreadedBulletinBoardSubscriber(subscriberClients, localClient); + + } + + @Override + public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException { + return null; + } + + @Override + public float getRedundancy(MessageID id) { + return 0; + } + + @Override + public List readMessages(MessageFilterList filterList) { + return null; + } + + @Override + public SyncQuery generateSyncQuery(GenerateSyncQueryParams GenerateSyncQueryParams) throws CommunicationException { + return null; + } + + @Override + public void close() { + + } + + @Override + public void subscribe(MessageFilterList filterList, FutureCallback> callback) { + + } + + @Override + public void subscribe(MessageFilterList filterList, long startEntry, FutureCallback> callback) { + + } +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java new file mode 100644 index 0000000..df3e196 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java @@ -0,0 +1,531 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.*; +import com.google.protobuf.ByteString; +import meerkat.comm.CommunicationException; +import meerkat.comm.MessageInputStream; +import meerkat.comm.MessageInputStream.MessageInputStreamFactory; +import meerkat.comm.MessageOutputStream; +import meerkat.crypto.concrete.SHA256Digest; +import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.protobuf.Voting.*; +import meerkat.util.BulletinBoardUtils; + +import javax.ws.rs.NotFoundException; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Created by Arbel Deutsch Peled on 15-Mar-16. + * This client is to be used mainly for testing. + * It wraps a BulletinBoardServer in an asynchronous client. + * This means the access to the server is direct (via method calls) instead of through a TCP connection. + * The client implements both synchronous and asynchronous method calls, but calls to the server itself are performed synchronously. + */ +public class LocalBulletinBoardClient implements SubscriptionAsyncBulletinBoardClient{ + + private final BulletinBoardServer server; + private final ListeningScheduledExecutorService executorService; + private final BatchDigest digest; + private final int subsrciptionDelay; + + /** + * Initializes an instance of the client + * @param server an initialized Bulletin Board Server instance which will perform the actual processing of the requests + * @param threadNum is the number of concurrent threads to allocate for the client + * @param subscriptionDelay is the required delay between subscription calls in milliseconds + */ + public LocalBulletinBoardClient(BulletinBoardServer server, int threadNum, int subscriptionDelay) { + this.server = server; + this.executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadNum)); + this.digest = new GenericBatchDigest(new SHA256Digest()); + this.subsrciptionDelay = subscriptionDelay; + } + + private class MessagePoster implements Callable { + + private final BulletinBoardMessage msg; + + public MessagePoster(BulletinBoardMessage msg) { + this.msg = msg; + } + + + @Override + public Boolean call() throws Exception { + return server.postMessage(msg).getValue(); + } + + } + + @Override + public MessageID postMessage(BulletinBoardMessage msg, FutureCallback callback) { + + Futures.addCallback(executorService.submit(new MessagePoster(msg)), callback); + + digest.update(msg.getMsg()); + return digest.digestAsMessageID(); + + } + + private class CompleteBatchPoster implements Callable { + + private final CompleteBatch completeBatch; + + public CompleteBatchPoster(CompleteBatch completeBatch) { + this.completeBatch = completeBatch; + } + + + @Override + public Boolean call() throws Exception { + + if (!server.beginBatch(completeBatch.getBeginBatchMessage()).getValue()) + return false; + + int i=0; + for (BatchData data : completeBatch.getBatchDataList()){ + + BatchMessage message = BatchMessage.newBuilder() + .setSignerId(completeBatch.getSignature().getSignerId()) + .setBatchId(completeBatch.getBeginBatchMessage().getBatchId()) + .setSerialNum(i) + .setData(data) + .build(); + + if (!server.postBatchMessage(message).getValue()) + return false; + + i++; + } + + return server.closeBatchMessage(completeBatch.getCloseBatchMessage()).getValue(); + } + + } + + @Override + public MessageID postBatch(CompleteBatch completeBatch, FutureCallback callback) { + + Futures.addCallback(executorService.schedule(new CompleteBatchPoster(completeBatch), subsrciptionDelay, TimeUnit.MILLISECONDS), callback); + + digest.update(completeBatch); + return digest.digestAsMessageID(); + + } + + private class BatchBeginner implements Callable { + + private final BeginBatchMessage msg; + + public BatchBeginner(BeginBatchMessage msg) { + this.msg = msg; + } + + + @Override + public Boolean call() throws Exception { + return server.beginBatch(msg).getValue(); + } + + } + + @Override + public void beginBatch(BeginBatchMessage beginBatchMessage, FutureCallback callback) { + Futures.addCallback(executorService.submit(new BatchBeginner(beginBatchMessage)), callback); + } + + private class BatchDataPoster implements Callable { + + private final ByteString signerId; + private final int batchId; + private final List batchDataList; + private final int startPosition; + + public BatchDataPoster(ByteString signerId, int batchId, List batchDataList, int startPosition) { + this.signerId = signerId; + this.batchId = batchId; + this.batchDataList = batchDataList; + this.startPosition = startPosition; + } + + + @Override + public Boolean call() throws Exception { + + BatchMessage.Builder msgBuilder = BatchMessage.newBuilder() + .setSignerId(signerId) + .setBatchId(batchId); + + int i = startPosition; + for (BatchData data : batchDataList){ + + msgBuilder.setSerialNum(i) + .setData(data); + + if (!server.postBatchMessage(msgBuilder.build()).getValue()) + return false; + + i++; + + } + + return true; + + } + + } + + @Override + public void postBatchData(byte[] signerId, int batchId, List batchDataList, int startPosition, FutureCallback callback) { + postBatchData(ByteString.copyFrom(signerId), batchId, batchDataList, startPosition, callback); + } + + @Override + public void postBatchData(byte[] signerId, int batchId, List batchDataList, FutureCallback callback) { + postBatchData(signerId, batchId, batchDataList, 0, callback); + } + + @Override + public void postBatchData(ByteString signerId, int batchId, List batchDataList, int startPosition, FutureCallback callback) { + Futures.addCallback(executorService.submit(new BatchDataPoster(signerId, batchId, batchDataList, startPosition)), callback); + } + + @Override + public void postBatchData(ByteString signerId, int batchId, List batchDataList, FutureCallback callback) { + postBatchData(signerId, batchId, batchDataList, 0, callback); + } + + private class BatchCloser implements Callable { + + private final CloseBatchMessage msg; + + public BatchCloser(CloseBatchMessage msg) { + this.msg = msg; + } + + + @Override + public Boolean call() throws Exception { + return server.closeBatchMessage(msg).getValue(); + } + + } + + @Override + public void closeBatch(CloseBatchMessage closeBatchMessage, FutureCallback callback) { + Futures.addCallback(executorService.submit(new BatchCloser(closeBatchMessage)), callback); + } + + private class RedundancyGetter implements Callable { + + private final MessageID msgId; + + public RedundancyGetter(MessageID msgId) { + this.msgId = msgId; + } + + + @Override + public Float call() throws Exception { + + MessageFilterList filterList = MessageFilterList.newBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.MSG_ID) + .setId(msgId.getID()) + .build()) + .build(); + + ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); + MessageOutputStream outputStream = new MessageOutputStream<>(byteOutputStream); + server.readMessages(filterList,outputStream); + + MessageInputStream inputStream = + MessageInputStreamFactory.createMessageInputStream( + new ByteArrayInputStream(byteOutputStream.toByteArray()), + BulletinBoardMessage.class); + + if (inputStream.isAvailable()) + return 1.0f; + else + return 0.0f; + + } + + } + + @Override + public void getRedundancy(MessageID id, FutureCallback callback) { + Futures.addCallback(executorService.submit(new RedundancyGetter(id)), callback); + } + + private class MessageReader implements Callable> { + + private final MessageFilterList filterList; + + public MessageReader(MessageFilterList filterList) { + this.filterList = filterList; + } + + + @Override + public List call() throws Exception { + + ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); + MessageOutputStream outputStream = new MessageOutputStream<>(byteOutputStream); + server.readMessages(filterList, outputStream); + + MessageInputStream inputStream = + MessageInputStreamFactory.createMessageInputStream( + new ByteArrayInputStream(byteOutputStream.toByteArray()), + BulletinBoardMessage.class); + + return inputStream.asList(); + + } + + } + + @Override + public void readMessages(MessageFilterList filterList, FutureCallback> callback) { + Futures.addCallback(executorService.submit(new MessageReader(filterList)), callback); + } + + class SubscriptionCallback implements FutureCallback> { + + private MessageFilterList filterList; + private final FutureCallback> callback; + + public SubscriptionCallback(MessageFilterList filterList, FutureCallback> callback) { + this.filterList = filterList; + this.callback = callback; + } + + @Override + public void onSuccess(List result) { + + // Report new messages to user + callback.onSuccess(result); + + MessageFilterList.Builder filterBuilder = filterList.toBuilder(); + + // If any new messages arrived: update the MIN_ENTRY condition + if (result.size() > 0) { + + // Remove last filter from list (MIN_ENTRY one) + filterBuilder.removeFilter(filterBuilder.getFilterCount() - 1); + + // Add updated MIN_ENTRY filter (entry number is successor of last received entry's number) + filterBuilder.addFilter(MessageFilter.newBuilder() + .setType(FilterType.MIN_ENTRY) + .setEntry(result.get(result.size() - 1).getEntryNum() + 1) + .build()); + + } + + filterList = filterBuilder.build(); + + // Reschedule job + Futures.addCallback(executorService.submit(new MessageReader(filterList)), this); + + } + + @Override + public void onFailure(Throwable t) { + + // Notify caller about failure and terminate subscription + callback.onFailure(t); + + } + } + + @Override + public void subscribe(MessageFilterList filterList, long startEntry, FutureCallback> callback) { + + MessageFilterList subscriptionFilterList = + filterList.toBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.MIN_ENTRY) + .setEntry(startEntry) + .build()) + .build(); + + Futures.addCallback(executorService.submit(new MessageReader(subscriptionFilterList)), new SubscriptionCallback(subscriptionFilterList, callback)); + + } + + @Override + public void subscribe(MessageFilterList filterList, FutureCallback> callback) { + subscribe(filterList, 0, callback); + } + + private class CompleteBatchReader implements Callable { + + private final BatchSpecificationMessage batchSpecificationMessage; + + public CompleteBatchReader(BatchSpecificationMessage batchSpecificationMessage) { + this.batchSpecificationMessage = batchSpecificationMessage; + } + + + @Override + public CompleteBatch call() throws Exception { + + final String[] TAGS_TO_REMOVE = {BulletinBoardConstants.BATCH_TAG, BulletinBoardConstants.BATCH_ID_TAG_PREFIX}; + + CompleteBatch completeBatch = new CompleteBatch(BeginBatchMessage.newBuilder() + .setSignerId(batchSpecificationMessage.getSignerId()) + .setBatchId(batchSpecificationMessage.getBatchId()) + .build()); + + ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); + MessageOutputStream batchOutputStream = new MessageOutputStream<>(byteOutputStream); + server.readBatch(batchSpecificationMessage,batchOutputStream); + + MessageInputStream batchInputStream = + MessageInputStreamFactory.createMessageInputStream( + new ByteArrayInputStream(byteOutputStream.toByteArray()), + BatchData.class); + + completeBatch.appendBatchData(batchInputStream.asList()); + + MessageFilterList filterList = MessageFilterList.newBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.TAG) + .setTag(BulletinBoardConstants.BATCH_TAG) + .build()) + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.TAG) + .setTag(BulletinBoardConstants.BATCH_ID_TAG_PREFIX + completeBatch.getBeginBatchMessage().getBatchId()) + .build()) + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.SIGNER_ID) + .setId(completeBatch.getBeginBatchMessage().getSignerId()) + .build()) + .build(); + + byteOutputStream = new ByteArrayOutputStream(); + MessageOutputStream messageOutputStream = new MessageOutputStream<>(byteOutputStream); + server.readMessages(filterList,messageOutputStream); + + MessageInputStream messageInputStream = + MessageInputStreamFactory.createMessageInputStream( + new ByteArrayInputStream(byteOutputStream.toByteArray()), + BulletinBoardMessage.class); + + if (!messageInputStream.isAvailable()) + throw new NotFoundException("Batch does not exist"); + + BulletinBoardMessage message = messageInputStream.readMessage(); + + completeBatch.setBeginBatchMessage(BeginBatchMessage.newBuilder() + .addAllTag(BulletinBoardUtils.removePrefixTags(message, Arrays.asList(TAGS_TO_REMOVE))) + .setSignerId(message.getSig(0).getSignerId()) + .setBatchId(Integer.parseInt(BulletinBoardUtils.findTagWithPrefix(message, BulletinBoardConstants.BATCH_ID_TAG_PREFIX))) + .build()); + + completeBatch.setSignature(message.getSig(0)); + completeBatch.setTimestamp(message.getMsg().getTimestamp()); + + return completeBatch; + + } + + } + + @Override + public void readBatch(BatchSpecificationMessage batchSpecificationMessage, FutureCallback callback) { + Futures.addCallback(executorService.submit(new CompleteBatchReader(batchSpecificationMessage)), callback); + } + + private class SyncQueryHandler implements Callable { + + private final SyncQuery syncQuery; + + public SyncQueryHandler(SyncQuery syncQuery) { + this.syncQuery = syncQuery; + } + + + @Override + public SyncQueryResponse call() throws Exception { + return server.querySync(syncQuery); + } + + } + + @Override + public void querySync(SyncQuery syncQuery, FutureCallback callback) { + Futures.addCallback(executorService.submit(new SyncQueryHandler(syncQuery)), callback); + } + + /** + * This method is a stub, since the implementation only considers one server, and that is given in the constructor + * @param ignored is ignored + */ + @Override + public void init(BulletinBoardClientParams ignored) {} + + @Override + public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException { + + try { + + MessagePoster poster = new MessagePoster(msg); + poster.call(); + + digest.update(msg); + return digest.digestAsMessageID(); + + } catch (Exception e) { + return null; + } + + } + + @Override + public float getRedundancy(MessageID id) { + + try { + + RedundancyGetter getter = new RedundancyGetter(id); + return getter.call(); + + } catch (Exception e) { + return -1.0f; + } + + } + + @Override + public List readMessages(MessageFilterList filterList) { + + try { + + MessageReader reader = new MessageReader(filterList); + return reader.call(); + + } catch (Exception e){ + return null; + } + + } + + @Override + public SyncQuery generateSyncQuery(GenerateSyncQueryParams GenerateSyncQueryParams) throws CommunicationException { + return server.generateSyncQuery(GenerateSyncQueryParams); + } + + @Override + public void close() { + try { + server.close(); + } catch (CommunicationException ignored) {} + } + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java index 633e495..2ed113a 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java @@ -1,13 +1,21 @@ package meerkat.bulletinboard; import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; import meerkat.comm.CommunicationException; +import meerkat.comm.MessageInputStream; import meerkat.crypto.Digest; import meerkat.crypto.concrete.SHA256Digest; +import meerkat.protobuf.BulletinBoardAPI; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Voting.*; import meerkat.rest.*; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.util.Collection; +import java.util.Iterator; import java.util.List; import javax.ws.rs.client.Client; @@ -129,6 +137,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{ */ @Override public List readMessages(MessageFilterList filterList) { + WebTarget webTarget; Response response; BulletinBoardMessageList messageList; @@ -139,6 +148,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{ } for (String db : meerkatDBs) { + try { webTarget = client.target(db).path(BULLETIN_BOARD_SERVER_PATH).path(READ_MESSAGES_PATH); @@ -151,9 +161,34 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{ } } catch (Exception e) {} + } return null; + + } + + @Override + public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException { + + WebTarget webTarget; + Response response; + + for (String db : meerkatDBs) { + + try { + webTarget = client.target(db).path(BULLETIN_BOARD_SERVER_PATH).path(GENERATE_SYNC_QUERY_PATH); + + response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(generateSyncQueryParams, Constants.MEDIATYPE_PROTOBUF)); + + return response.readEntity(SyncQuery.class); + + } catch (Exception e) {} + + } + + throw new CommunicationException("Could not contact any server"); + } public void close() { diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java index 24cd6bf..34531cf 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java @@ -7,6 +7,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import meerkat.bulletinboard.workers.singleserver.*; import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Voting.BulletinBoardClientParams; import meerkat.util.BulletinBoardUtils; @@ -29,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; * If the list of servers contains more than one server: the server actually used is the first one * The class further implements a delayed access to the server after a communication error occurs */ -public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient implements AsyncBulletinBoardClient { +public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient implements SubscriptionAsyncBulletinBoardClient { private final int MAX_RETRIES = 11; @@ -275,13 +276,13 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i class SubscriptionCallback implements FutureCallback> { private SingleServerReadMessagesWorker worker; - private final MessageHandler messageHandler; + private final FutureCallback> callback; private MessageFilterList.Builder filterBuilder; - public SubscriptionCallback(SingleServerReadMessagesWorker worker, MessageHandler messageHandler) { + public SubscriptionCallback(SingleServerReadMessagesWorker worker, FutureCallback> callback) { this.worker = worker; - this.messageHandler = messageHandler; + this.callback = callback; filterBuilder = worker.getPayload().toBuilder(); } @@ -290,7 +291,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i public void onSuccess(List result) { // Report new messages to user - messageHandler.handleNewMessages(result); + callback.onSuccess(result); // Remove last filter from list (MIN_ENTRY one) filterBuilder.removeFilter(filterBuilder.getFilterCount() - 1); @@ -315,14 +316,16 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i // Notify client about failure fail(); - // Reschedule exact same task - scheduleWorker(worker, this); + // Notify caller about failure and terminate subscription + callback.onFailure(t); } } - public SingleServerBulletinBoardClient(int threadPoolSize, long failDelayInMilliseconds, long subscriptionIntervalInMilliseconds) { + public SingleServerBulletinBoardClient(ListeningScheduledExecutorService executorService, + long failDelayInMilliseconds, + long subscriptionIntervalInMilliseconds) { - executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadPoolSize)); + this.executorService = executorService; this.failDelayInMilliseconds = failDelayInMilliseconds; this.subscriptionIntervalInMilliseconds = subscriptionIntervalInMilliseconds; @@ -332,6 +335,14 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } + public SingleServerBulletinBoardClient(int threadPoolSize, long failDelayInMilliseconds, long subscriptionIntervalInMilliseconds) { + + this(MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadPoolSize)), + failDelayInMilliseconds, + subscriptionIntervalInMilliseconds); + + } + /** * Stores database location, initializes the web Client and * @param clientParams contains the data needed to access the DBs @@ -567,8 +578,16 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } @Override - public void subscribe(MessageFilterList filterList, MessageHandler messageHandler) { + public void querySync(SyncQuery syncQuery, FutureCallback callback) { + SingleServerQuerySyncWorker worker = new SingleServerQuerySyncWorker(meerkatDBs.get(0), syncQuery, MAX_RETRIES); + + scheduleWorker(worker, new RetryCallback<>(worker, callback)); + + } + + @Override + public void subscribe(MessageFilterList filterList, long startEntry, FutureCallback> callback) { // Remove all existing MIN_ENTRY filters and create new one that starts at 0 MessageFilterList.Builder filterListBuilder = filterList.toBuilder(); @@ -583,15 +602,19 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } filterListBuilder.addFilter(MessageFilter.newBuilder() .setType(FilterType.MIN_ENTRY) - .setEntry(0) + .setEntry(startEntry) .build()); // Create job with no retries - SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterListBuilder.build(), 1); + SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterListBuilder.build(), MAX_RETRIES); - // Submit job and create callback - scheduleWorker(worker, new SubscriptionCallback(worker, messageHandler)); + // Submit job and create callback that retries on failure and handles repeated subscription + scheduleWorker(worker, new RetryCallback<>(worker, new SubscriptionCallback(worker, callback))); + } + @Override + public void subscribe(MessageFilterList filterList, FutureCallback> callback) { + subscribe(filterList, 0, callback); } @Override diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java index d8f66f2..52e28e0 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java @@ -5,6 +5,7 @@ import com.google.protobuf.ByteString; import meerkat.bulletinboard.workers.multiserver.*; import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Voting.*; @@ -55,7 +56,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple batchDigest = new GenericBatchDigest(digest); - minAbsoluteRedundancy = (int) (clientParams.getMinRedundancy() * clientParams.getBulletinBoardAddressCount()); + minAbsoluteRedundancy = (int) (clientParams.getMinRedundancy() * (float) clientParams.getBulletinBoardAddressCount()); executorService = Executors.newFixedThreadPool(JOBS_THREAD_NUM); @@ -223,9 +224,13 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple } + /** + * This method is not supported by this class! + * This is because it has no meaning when considering more than one server without knowing which server will be contacted + */ @Override - public void subscribe(MessageFilterList filterList, MessageHandler messageHandler) { - // TODO: Implement + public void querySync(SyncQuery syncQuery, FutureCallback callback) { + callback.onFailure(new IllegalAccessError("querySync is not supported by this class")); } @Override diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java new file mode 100644 index 0000000..cf8d47d --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java @@ -0,0 +1,272 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.protobuf.Timestamp; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.util.BulletinBoardUtils; + +import static meerkat.protobuf.BulletinBoardAPI.FilterType.*; + +import java.sql.Time; +import java.util.*; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Created by Arbel Deutsch Peled on 03-Mar-16. + * A multi-server implementation of the {@link BulletinBoardSubscriber} + */ +public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber { + + protected final Collection clients; + protected final BulletinBoardClient localClient; + + protected Iterator clientIterator; + protected SubscriptionAsyncBulletinBoardClient currentClient; + + private long lastServerSwitchTime; + + private AtomicBoolean isSyncInProgress; + private Semaphore rescheduleSemaphore; + + private static final Float[] BREAKPOINTS = {0.5f, 0.75f, 0.9f, 0.95f, 0.99f, 0.999f}; + + public ThreadedBulletinBoardSubscriber(Collection clients, BulletinBoardClient localClient) { + + this.clients = clients; + this.localClient = localClient; + + lastServerSwitchTime = System.currentTimeMillis(); + + clientIterator = clients.iterator(); + currentClient = clientIterator.next(); + + isSyncInProgress = new AtomicBoolean(false); + rescheduleSemaphore = new Semaphore(1); + + } + + /** + * Moves to next client and performs resync with it + */ + private void nextClient() { + + try { + + rescheduleSemaphore.acquire(); + + if (!clientIterator.hasNext()){ + clientIterator = clients.iterator(); + } + + currentClient = clientIterator.next(); + + lastServerSwitchTime = System.currentTimeMillis(); + + isSyncInProgress.set(false); + + rescheduleSemaphore.release(); + + } catch (InterruptedException e) { + // TODO: log + // Do not change client + } + + } + + private abstract class SubscriberCallback implements FutureCallback { + + protected final MessageFilterList filterList; + protected final FutureCallback> callback; + private final long invocationTime; + + public SubscriberCallback(MessageFilterList filterList, FutureCallback> callback) { + + this.filterList = filterList; + this.callback = callback; + this.invocationTime = System.currentTimeMillis(); + + } + + /** + * Handles resyncing process for the given subscription after a server is switched + * Specifically: generates a sync query from the local database and uses it to query the current server + */ + private void reSync() { + + SyncQuery syncQuery = null; + try { + + syncQuery = localClient.generateSyncQuery(GenerateSyncQueryParams.newBuilder() + .setFilterList(filterList) + .addAllBreakpointList(Arrays.asList(BREAKPOINTS)) + .build()); + + } catch (CommunicationException e) { + + // Handle failure in standard way + onFailure(e); + + } + + currentClient.querySync(syncQuery, new SyncQueryCallback(filterList, callback)); + + } + + /** + * Reschedules the subscription + */ + private void reschedule() { + + try { + + rescheduleSemaphore.acquire(); + + reSync(); + + rescheduleSemaphore.release(); + + + } catch (InterruptedException e) { + + //TODO: log + + callback.onFailure(e); // Hard error: Cannot guarantee subscription safety + + } + + } + + @Override + public void onFailure(Throwable t) { + + // If server failure is not already known: switch to next client and resync + if (invocationTime > lastServerSwitchTime){ + + // Make sure only what thread switches the client + if (isSyncInProgress.compareAndSet(false, true)){ + nextClient(); + } + + } + + reschedule(); + + } + + } + + /** + * Provides handling logic for resync query callback operation + * Receives a SyncQueryResponse and reads the missing data (starting from the received timestamp) if needed + */ + protected class SyncQueryCallback extends SubscriberCallback { + + public SyncQueryCallback (MessageFilterList filterList, FutureCallback> callback) { + + super(filterList, callback); + + } + + @Override + public void onSuccess(SyncQueryResponse result) { + + final Timestamp DEFAULT_TIME = BulletinBoardUtils.toTimestampProto(946728000); // Year 2000 + + // Read required messages according to received Timestamp + + Timestamp syncTimestamp; + + if (result.hasLastTimeOfSync()) { + syncTimestamp = result.getLastTimeOfSync(); // Use returned time of sync + } else { + syncTimestamp = DEFAULT_TIME; // Get all messages + } + + MessageFilterList timestampedFilterList = filterList.toBuilder() + .removeFilter(filterList.getFilterCount()-1) // Remove MIN_ENTRY filter + .addFilter(MessageFilter.newBuilder() // Add timestamp filter + .setType(AFTER_TIME) + .setTimestamp(syncTimestamp) + .build()) + .build(); + + currentClient.readMessages(timestampedFilterList, new ReSyncCallback(filterList, callback, result.getLastEntryNum())); + + } + + } + + /** + * Provides handling logic for callback of resyncing process + * Receives the missing messages, handles them and resubscribes + */ + protected class ReSyncCallback extends SubscriberCallback> { + + private long minEntry; + + public ReSyncCallback (MessageFilterList filterList, FutureCallback> callback, long minEntry) { + + super(filterList, callback); + + this.minEntry = minEntry; + + } + + @Override + public void onSuccess(List result) { + + // Propagate result to caller + callback.onSuccess(result); + + // Renew subscription + + MessageFilterList newFilterList = filterList.toBuilder() + .removeFilter(filterList.getFilterCount()-1) // Remove current MIN_ENTRY filter + .addFilter(MessageFilter.newBuilder() // Add new MIN_ENTRY filter for current server + .setType(MIN_ENTRY) + .setEntry(minEntry) + .build()) + .build(); + + currentClient.subscribe(newFilterList, callback); + + } + + } + + /** + * Provides the handling logic for results and failures of main subscription (while there are no errors) + */ + protected class SubscriptionCallback extends SubscriberCallback> { + + public SubscriptionCallback(MessageFilterList filterList, FutureCallback> callback){ + super(filterList, callback); + } + + + @Override + public void onSuccess(List result) { + + // Propagate result to caller + callback.onSuccess(result); + + } + + } + + @Override + public void subscribe(MessageFilterList filterList, long startEntry, FutureCallback> callback) { + + currentClient.subscribe(filterList, startEntry, new SubscriptionCallback(filterList, callback)); + + } + + @Override + public void subscribe(MessageFilterList filterList, FutureCallback> callback) { + subscribe(filterList, 0, callback); + } + + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGetRedundancyWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGetRedundancyWorker.java index 10517f7..0401a76 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGetRedundancyWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGetRedundancyWorker.java @@ -2,6 +2,7 @@ package meerkat.bulletinboard.workers.singleserver; import meerkat.bulletinboard.SingleServerWorker; import meerkat.comm.CommunicationException; +import meerkat.comm.MessageInputStream; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.rest.Constants; @@ -11,6 +12,10 @@ import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; + import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH; import static meerkat.bulletinboard.BulletinBoardConstants.READ_MESSAGES_PATH; @@ -45,17 +50,19 @@ public class SingleServerGetRedundancyWorker extends SingleServerWorker inputStream = null; // Retrieve answer try { - // If a BulletinBoardMessageList is returned: the read was successful - BulletinBoardMessageList msgList = response.readEntity(BulletinBoardMessageList.class); + inputStream = MessageInputStream.MessageInputStreamFactory.createMessageInputStream(in, BulletinBoardMessage.class); - if (msgList.getMessageList().size() > 0){ + if (inputStream.asList().size() > 0){ // Message exists in the server return 1.0f; } @@ -64,14 +71,15 @@ public class SingleServerGetRedundancyWorker extends SingleServerWorker testDB = new LinkedList<>(); - testDB.add(BASE_URL); - - bulletinBoardClient.init(BulletinBoardClientParams.newBuilder() - .addAllBulletinBoardAddress(testDB) - .setMinRedundancy((float) 1.0) - .build()); - postCallback = new PostCallback(); redundancyCallback = new RedundancyCallback((float) 1.0); @@ -330,11 +308,8 @@ public class ThreadedBulletinBoardClientIntegrationTest { * Closes the client and makes sure the test fails when an exception occurred in a separate thread */ - @After public void close() { - bulletinBoardClient.close(); - if (thrown.size() > 0) { assert false; } @@ -344,7 +319,6 @@ public class ThreadedBulletinBoardClientIntegrationTest { /** * Tests the standard post, redundancy and read methods */ - @Test public void postTest() { byte[] b1 = {(byte) 1, (byte) 2, (byte) 3, (byte) 4}; @@ -424,7 +398,6 @@ public class ThreadedBulletinBoardClientIntegrationTest { * Also tests not being able to post to a closed batch * @throws CommunicationException, SignatureException, InterruptedException */ - @Test public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException { final int SIGNER = 1; @@ -447,15 +420,7 @@ public class ThreadedBulletinBoardClientIntegrationTest { // Close batch - CloseBatchMessage closeBatchMessage = CloseBatchMessage.newBuilder() - .setBatchId(BATCH_ID) - .setBatchLength(BATCH_LENGTH) - .setTimestamp(Timestamp.newBuilder() - .setSeconds(50) - .setNanos(80) - .build()) - .setSig(completeBatch.getSignature()) - .build(); + CloseBatchMessage closeBatchMessage = completeBatch.getCloseBatchMessage(); bulletinBoardClient.closeBatch(closeBatchMessage, postCallback); @@ -490,10 +455,9 @@ public class ThreadedBulletinBoardClientIntegrationTest { /** * Posts a complete batch message - * Checks reading od the message + * Checks reading of the message * @throws CommunicationException, SignatureException, InterruptedException */ - @Test public void testCompleteBatchPost() throws CommunicationException, SignatureException, InterruptedException { final int SIGNER = 0; @@ -529,7 +493,6 @@ public class ThreadedBulletinBoardClientIntegrationTest { * Tests that an unopened batch cannot be closed * @throws CommunicationException, InterruptedException */ - @Test public void testInvalidBatchClose() throws CommunicationException, InterruptedException { final int NON_EXISTENT_BATCH_ID = 999; diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java new file mode 100644 index 0000000..a91f8d6 --- /dev/null +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java @@ -0,0 +1,231 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import meerkat.comm.CommunicationException; +import meerkat.crypto.concrete.ECDSASignature; +import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.util.BulletinBoardMessageComparator; +import meerkat.util.BulletinBoardMessageGenerator; + +import java.io.IOException; +import java.io.InputStream; +import java.security.*; +import java.security.cert.CertificateException; +import java.util.*; +import java.util.concurrent.Semaphore; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.startsWith; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Created by Arbel Deutsch Peled on 22-Mar-16. + */ +public class GenericSubscriptionClientTester { + + private GenericBatchDigitalSignature signers[]; + private ByteString[] signerIDs; + + private static String KEYFILE_EXAMPLE = "/certs/enduser-certs/user1-key-with-password-secret.p12"; + private static String KEYFILE_EXAMPLE3 = "/certs/enduser-certs/user3-key-with-password-shh.p12"; + + private static String KEYFILE_PASSWORD1 = "secret"; + private static String KEYFILE_PASSWORD3 = "shh"; + + private static String CERT1_PEM_EXAMPLE = "/certs/enduser-certs/user1.crt"; + private static String CERT3_PEM_EXAMPLE = "/certs/enduser-certs/user3.crt"; + + private SubscriptionAsyncBulletinBoardClient bulletinBoardClient; + + private Random random; + private BulletinBoardMessageGenerator generator; + + private Semaphore jobSemaphore; + private Vector thrown; + + public GenericSubscriptionClientTester(SubscriptionAsyncBulletinBoardClient bulletinBoardClient){ + + this.bulletinBoardClient = bulletinBoardClient; + + signers = new GenericBatchDigitalSignature[2]; + signerIDs = new ByteString[signers.length]; + signers[0] = new GenericBatchDigitalSignature(new ECDSASignature()); + signers[1] = new GenericBatchDigitalSignature(new ECDSASignature()); + + InputStream keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE); + char[] password = KEYFILE_PASSWORD1.toCharArray(); + + KeyStore.Builder keyStoreBuilder; + try { + keyStoreBuilder = signers[0].getPKCS12KeyStoreBuilder(keyStream, password); + + signers[0].loadSigningCertificate(keyStoreBuilder); + + signers[0].loadVerificationCertificates(getClass().getResourceAsStream(CERT1_PEM_EXAMPLE)); + + keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE3); + password = KEYFILE_PASSWORD3.toCharArray(); + + keyStoreBuilder = signers[1].getPKCS12KeyStoreBuilder(keyStream, password); + signers[1].loadSigningCertificate(keyStoreBuilder); + + signers[1].loadVerificationCertificates(getClass().getResourceAsStream(CERT3_PEM_EXAMPLE)); + + for (int i = 0 ; i < signers.length ; i++) { + signerIDs[i] = signers[i].getSignerID(); + } + + } catch (IOException e) { + System.err.println("Failed reading from signature file " + e.getMessage()); + fail("Failed reading from signature file " + e.getMessage()); + } catch (CertificateException e) { + System.err.println("Failed reading certificate " + e.getMessage()); + fail("Failed reading certificate " + e.getMessage()); + } catch (KeyStoreException e) { + System.err.println("Failed reading keystore " + e.getMessage()); + fail("Failed reading keystore " + e.getMessage()); + } catch (NoSuchAlgorithmException e) { + System.err.println("Couldn't find signing algorithm " + e.getMessage()); + fail("Couldn't find signing algorithm " + e.getMessage()); + } catch (UnrecoverableKeyException e) { + System.err.println("Couldn't find signing key " + e.getMessage()); + fail("Couldn't find signing key " + e.getMessage()); + } + + } + + /** + * Takes care of initializing the client and the test resources + */ + public void init(){ + + random = new Random(0); // We use insecure randomness in tests for repeatability + generator = new BulletinBoardMessageGenerator(random); + + thrown = new Vector<>(); + jobSemaphore = new Semaphore(0); + + } + + /** + * Closes the client and makes sure the test fails when an exception occurred in a separate thread + */ + + public void close() { + + if (thrown.size() > 0) { + assert false; + } + + } + + private class SubscriptionCallback implements FutureCallback>{ + + private int stage; + private final List> expectedMessages; + private final List messagesToPost; + private final BulletinBoardMessageComparator comparator; + + public SubscriptionCallback(List> expectedMessages, List messagesToPost) { + + this.expectedMessages = expectedMessages; + this.messagesToPost = messagesToPost; + this.stage = 0; + this.comparator = new BulletinBoardMessageComparator(); + + } + + @Override + public void onSuccess(List result) { + + if (stage >= expectedMessages.size()) + return; + + // Check for consistency + + List expectedMsgList = expectedMessages.get(stage); + + if (expectedMsgList.size() != result.size()){ + onFailure(new AssertionError("Received wrong number of messages")); + return; + } + + Iterator expectedMessageIterator = expectedMsgList.iterator(); + Iterator receivedMessageIterator = result.iterator(); + + while (expectedMessageIterator.hasNext()) { + if(comparator.compare(expectedMessageIterator.next(), receivedMessageIterator.next()) != 0){ + onFailure(new AssertionError("Received unexpected message")); + return; + } + } + + // Post new message + try { + if (stage < messagesToPost.size()) { + bulletinBoardClient.postMessage(messagesToPost.get(stage)); + } + } catch (CommunicationException e) { + onFailure(e); + return; + } + + stage++; + jobSemaphore.release(); + } + + @Override + public void onFailure(Throwable t) { + System.err.println(t.getCause() + " " + t.getMessage()); + thrown.add(t); + jobSemaphore.release(expectedMessages.size()); + stage = expectedMessages.size(); + } + } + + public void subscriptionTest() throws SignatureException, CommunicationException { + + final int FIRST_POST_ID = 201; + final int SECOND_POST_ID = 202; + final String COMMON_TAG = "SUBSCRIPTION_TEST"; + + List tags = new LinkedList<>(); + tags.add(COMMON_TAG); + + BulletinBoardMessage msg1 = generator.generateRandomMessage(signers, Timestamp.newBuilder().setSeconds(1000).setNanos(900).build(), 10, 4, tags); + BulletinBoardMessage msg2 = generator.generateRandomMessage(signers, Timestamp.newBuilder().setSeconds(800).setNanos(300).build(), 10, 4); + BulletinBoardMessage msg3 = generator.generateRandomMessage(signers, Timestamp.newBuilder().setSeconds(2000).setNanos(0).build(), 10, 4, tags); + + MessageFilterList filterList = MessageFilterList.newBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.TAG) + .setTag(COMMON_TAG) + .build()) + .build(); + + List> expectedMessages = new ArrayList<>(3); + expectedMessages.add(new LinkedList()); + expectedMessages.add(new LinkedList()); + expectedMessages.add(new LinkedList()); + expectedMessages.get(0).add(msg1); + expectedMessages.get(2).add(msg3); + + List messagesToPost = new ArrayList<>(2); + messagesToPost.add(msg2); + messagesToPost.add(msg3); + + bulletinBoardClient.postMessage(msg1); + bulletinBoardClient.subscribe(filterList, new SubscriptionCallback(expectedMessages, messagesToPost)); + + try { + jobSemaphore.acquire(3); + } catch (InterruptedException e) { + System.err.println(e.getCause() + " " + e.getMessage()); + } + + } + +} diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java new file mode 100644 index 0000000..d2039ae --- /dev/null +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java @@ -0,0 +1,118 @@ +package meerkat.bulletinboard; + +import meerkat.bulletinboard.sqlserver.*; +import meerkat.comm.CommunicationException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.security.SignatureException; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +import static org.junit.Assert.fail; + +/** + * Created by Arbel Deutsch Peled on 05-Dec-15. + */ +public class LocalBulletinBoardClientTest { + + private static final int THREAD_NUM = 3; + private static final String DB_NAME = "TestDB"; + + private static final int SUBSRCIPTION_DELAY = 3000; + + // Testers + private GenericBulletinBoardClientTester clientTest; + private GenericSubscriptionClientTester subscriptionTester; + + public LocalBulletinBoardClientTest() throws CommunicationException { + + H2QueryProvider queryProvider = new H2QueryProvider(DB_NAME) ; + + try { + + Connection conn = queryProvider.getDataSource().getConnection(); + Statement stmt = conn.createStatement(); + + List deletionQueries = queryProvider.getSchemaDeletionCommands(); + + for (String deletionQuery : deletionQueries) { + stmt.execute(deletionQuery); + } + + } catch (SQLException e) { + System.err.println(e.getMessage()); + throw new CommunicationException(e.getCause() + " " + e.getMessage()); + } + + BulletinBoardServer server = new BulletinBoardSQLServer(queryProvider); + server.init(DB_NAME); + + LocalBulletinBoardClient client = new LocalBulletinBoardClient(server, THREAD_NUM, SUBSRCIPTION_DELAY); + subscriptionTester = new GenericSubscriptionClientTester(client); + clientTest = new GenericBulletinBoardClientTester(client); + + } + + // Test methods + + /** + * Takes care of initializing the client and the test resources + */ + @Before + public void init(){ + + clientTest.init(); + + } + + /** + * Closes the client and makes sure the test fails when an exception occurred in a separate thread + */ + + @After + public void close() { + + clientTest.close(); + + } + + @Test + public void postTest() { + + clientTest.postTest(); + + } + + @Test + public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + clientTest.testBatchPost(); + } + + @Test + public void testCompleteBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + clientTest.testCompleteBatchPost(); + + } + + @Test + public void testInvalidBatchClose() throws CommunicationException, InterruptedException { + + clientTest.testInvalidBatchClose(); + + } + + @Test + public void testSubscription() throws SignatureException, CommunicationException { + subscriptionTester.init(); + subscriptionTester.subscriptionTest(); + subscriptionTester.close(); + + } + +} diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/ThreadedBulletinBoardClientIntegrationTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/ThreadedBulletinBoardClientIntegrationTest.java new file mode 100644 index 0000000..b69cf72 --- /dev/null +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/ThreadedBulletinBoardClientIntegrationTest.java @@ -0,0 +1,95 @@ +package meerkat.bulletinboard; + +import meerkat.comm.CommunicationException; + +import meerkat.protobuf.Voting.*; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.security.SignatureException; +import java.util.LinkedList; +import java.util.List; + +/** + * Created by Arbel Deutsch Peled on 05-Dec-15. + */ +public class ThreadedBulletinBoardClientIntegrationTest { + + // Server data + + private static String PROP_GETTY_URL = "gretty.httpBaseURI"; + private static String DEFAULT_BASE_URL = "http://localhost:8081"; + private static String BASE_URL = System.getProperty(PROP_GETTY_URL, DEFAULT_BASE_URL); + + // Tester + private GenericBulletinBoardClientTester clientTest; + + public ThreadedBulletinBoardClientIntegrationTest(){ + + ThreadedBulletinBoardClient client = new ThreadedBulletinBoardClient(); + + List testDB = new LinkedList<>(); + testDB.add(BASE_URL); + + client.init(BulletinBoardClientParams.newBuilder() + .addAllBulletinBoardAddress(testDB) + .setMinRedundancy((float) 1.0) + .build()); + + clientTest = new GenericBulletinBoardClientTester(client); + + } + + // Test methods + + /** + * Takes care of initializing the client and the test resources + */ + @Before + public void init(){ + + clientTest.init(); + + } + + /** + * Closes the client and makes sure the test fails when an exception occurred in a separate thread + */ + + @After + public void close() { + + clientTest.close(); + + } + + @Test + public void postTest() { + + clientTest.postTest(); + + } + + @Test + public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + clientTest.testBatchPost(); + } + + @Test + public void testCompleteBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + clientTest.testCompleteBatchPost(); + + } + + @Test + public void testInvalidBatchClose() throws CommunicationException, InterruptedException { + + clientTest.testInvalidBatchClose(); + + } + +} diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java index 9fe54ce..ab82ab1 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java @@ -1,33 +1,38 @@ package meerkat.bulletinboard.sqlserver; -import com.google.protobuf.ByteString; -import com.google.protobuf.ProtocolStringList; +import java.sql.*; +import java.util.*; + +import com.google.protobuf.*; + +import com.google.protobuf.Timestamp; import meerkat.bulletinboard.*; import meerkat.bulletinboard.sqlserver.mappers.*; +import static meerkat.bulletinboard.BulletinBoardConstants.*; + import meerkat.comm.CommunicationException; + import meerkat.comm.MessageOutputStream; import meerkat.crypto.concrete.ECDSASignature; import meerkat.crypto.concrete.SHA256Digest; + import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Crypto.Signature; import meerkat.protobuf.Crypto.SignatureVerificationKey; + + +import static meerkat.bulletinboard.sqlserver.BulletinBoardSQLServer.SQLQueryProvider.*; + +import javax.sql.DataSource; + import meerkat.util.BulletinBoardUtils; import meerkat.util.TimestampComparator; +import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.jdbc.support.GeneratedKeyHolder; import org.springframework.jdbc.support.KeyHolder; -import javax.sql.DataSource; -import java.sql.SQLException; -import java.sql.Types; -import java.util.*; - -import static meerkat.bulletinboard.BulletinBoardConstants.BATCH_ID_TAG_PREFIX; -import static meerkat.bulletinboard.BulletinBoardConstants.BATCH_TAG; -import static meerkat.bulletinboard.sqlserver.BulletinBoardSQLServer.SQLQueryProvider.FilterTypeParam; -import static meerkat.bulletinboard.sqlserver.BulletinBoardSQLServer.SQLQueryProvider.QueryType; - /** @@ -583,14 +588,12 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ } - /** - * Used to retrieve just basic information about messages to allow calculation of checksum + * Private implementation of the message stub reader for returning result as a list * @param filterList is a filter list that defines which messages the client is interested in - * @return a list of Bulletin Board Messages that contain just the entry number, timestamp and message ID for each message - * The message ID is returned inside the message data field + * @return the requested list of message stubs */ - protected List readMessageStubs(MessageFilterList filterList) { + private List readMessageStubs(MessageFilterList filterList) { StringBuilder sqlBuilder = new StringBuilder(50 * (filterList.getFilterCount() + 1)); @@ -632,6 +635,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ } + /** * This method returns a string representation of the tag associated with a batch ID * @param batchId is the given batch ID @@ -641,6 +645,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ return BATCH_ID_TAG_PREFIX + Integer.toString(batchId); } + /** * This method checks if a specified batch exists and is already closed * @param signerId is the ID of the publisher of the batch @@ -684,6 +689,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ } + @Override public BoolMsg beginBatch(BeginBatchMessage message) throws CommunicationException { @@ -716,8 +722,10 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ jdbcTemplate.batchUpdate(sql,namedParameters); return BoolMsg.newBuilder().setValue(true).build(); + } + @Override public BoolMsg postBatchMessage(BatchMessage batchMessage) throws CommunicationException{ @@ -741,6 +749,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ } + @Override public BoolMsg closeBatchMessage(CloseBatchMessage message) throws CommunicationException { @@ -764,13 +773,12 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ return BoolMsg.newBuilder().setValue(false).build(); } - // Get Tags and add them to CompleteBatch sql = sqlQueryProvider.getSQLString(QueryType.GET_BATCH_TAGS); namedParameters = new MapSqlParameterSource(); - namedParameters.addValue(QueryType.GET_BATCH_TAGS.getParamName(0),signerId); + namedParameters.addValue(QueryType.GET_BATCH_TAGS.getParamName(0),signerId.toByteArray()); namedParameters.addValue(QueryType.GET_BATCH_TAGS.getParamName(1),batchId); List tags = jdbcTemplate.query(sql, namedParameters, new StringMapper()); @@ -844,6 +852,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ return BoolMsg.newBuilder().setValue(true).build(); } + @Override public void readBatch(BatchSpecificationMessage message, MessageOutputStream out) throws CommunicationException, IllegalArgumentException{ @@ -863,6 +872,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ } + /** * Finds the entry number of the last entry in the database * @return the entry number, or -1 if no entries are found @@ -881,10 +891,80 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ } + @Override + public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) { + + if (generateSyncQueryParams == null + || !generateSyncQueryParams.hasFilterList() + || generateSyncQueryParams.getFilterList().getFilterCount() <= 0 + || generateSyncQueryParams.getBreakpointListCount() <= 0){ + + return SyncQuery.getDefaultInstance(); + + } + + List messages = readMessageStubs(generateSyncQueryParams.getFilterList()); + + if (messages.size() <= 0){ + return SyncQuery.newBuilder().build(); + } + + SyncQuery.Builder resultBuilder = SyncQuery.newBuilder(); + + Iterator messageIterator = messages.iterator(); + Iterator breakpointIterator = generateSyncQueryParams.getBreakpointListList().iterator(); + + Checksum checksum = new SimpleChecksum(); + checksum.setDigest(new SHA256Digest()); + + Timestamp lastTimestamp = Timestamp.getDefaultInstance(); + BulletinBoardMessage message = messageIterator.next(); + long currentMessageNum = 1; + + boolean checksumChanged = true; + + while (breakpointIterator.hasNext()){ + + Float breakpoint = breakpointIterator.next(); + + // Continue while breakpoint not reached, or it has been reached but no new timestamp has been encountered since + while ( messageIterator.hasNext() + && ((float) currentMessageNum / (float) messages.size() <= breakpoint) + || ((float) currentMessageNum / (float) messages.size() > breakpoint + && lastTimestamp.equals(message.getMsg().getTimestamp()))){ + + checksumChanged = true; + + checksum.update(message.getMsg().getData()); + + lastTimestamp = message.getMsg().getTimestamp(); + message = messageIterator.next(); + + } + + if (checksumChanged) { + + checksum.update(message.getMsg().getData()); + resultBuilder.addQuery(SingleSyncQuery.newBuilder() + .setTimeOfSync(message.getMsg().getTimestamp()) + .setChecksum(checksum.getChecksum()) + .build()); + + } + + checksumChanged = false; + + } + + return resultBuilder.build(); + + } + + /** * Searches for the latest time of sync of the DB relative to a given query and returns the metadata needed to complete the sync - * The checksum up to (and including) each given timestamp is calculated using bitwise XOR on 8-byte sized blocks of the message IDs - * @param syncQuery contains a succinct representation of states to compare to + * The checksum up to (and including) each given timestamp is calculated using an instance of SimpleChecksum + * @param syncQuery contains a succinct representation of states to compare against * @return the current last entry num and latest time of sync if there is one; -1 as last entry and empty timestamp otherwise * @throws CommunicationException */ @@ -957,6 +1037,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ } + @Override public void close() {} diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java index c390048..a54c2ff 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java @@ -158,10 +158,10 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider + " WHERE TagTable.Tag = :Tag" + serialString + " AND MsgTagTable.EntryNum = MsgTable.EntryNum)"; case BEFORE_TIME: - return "MsgTable.ExactTime <= :TimeStamp"; + return "MsgTable.ExactTime <= :TimeStamp" + serialString; case AFTER_TIME: - return "MsgTable.ExactTime >= :TimeStamp"; + return "MsgTable.ExactTime >= :TimeStamp" + serialString; default: throw new IllegalArgumentException("Cannot serve a filter of type " + filterType); @@ -186,6 +186,11 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider case TAG: return "VARCHAR"; + case AFTER_TIME: // Go through + case BEFORE_TIME: + return "TIMESTAMP"; + + default: throw new IllegalArgumentException("Cannot serve a filter of type " + filterType); } diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/mappers/MessageCallbackHandler.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/mappers/MessageCallbackHandler.java index bdba241..71ba742 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/mappers/MessageCallbackHandler.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/mappers/MessageCallbackHandler.java @@ -20,9 +20,9 @@ import java.util.List; */ public class MessageCallbackHandler implements RowCallbackHandler { - NamedParameterJdbcTemplate jdbcTemplate; - SQLQueryProvider sqlQueryProvider; - MessageOutputStream out; + private final NamedParameterJdbcTemplate jdbcTemplate; + private final SQLQueryProvider sqlQueryProvider; + private final MessageOutputStream out; public MessageCallbackHandler(NamedParameterJdbcTemplate jdbcTemplate, SQLQueryProvider sqlQueryProvider, MessageOutputStream out) { diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/mappers/MessageStubCallbackHandler.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/mappers/MessageStubCallbackHandler.java new file mode 100644 index 0000000..f81cc76 --- /dev/null +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/mappers/MessageStubCallbackHandler.java @@ -0,0 +1,59 @@ +package meerkat.bulletinboard.sqlserver.mappers; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import meerkat.bulletinboard.sqlserver.BulletinBoardSQLServer.SQLQueryProvider.QueryType; +import meerkat.comm.MessageOutputStream; +import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage; +import meerkat.protobuf.BulletinBoardAPI.UnsignedBulletinBoardMessage; +import meerkat.protobuf.Crypto; +import meerkat.util.BulletinBoardUtils; +import org.springframework.jdbc.core.RowCallbackHandler; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +/** + * Created by Arbel Deutsch Peled on 21-Feb-16. + */ +public class MessageStubCallbackHandler implements RowCallbackHandler { + + private final MessageOutputStream out; + + public MessageStubCallbackHandler(MessageOutputStream out) { + + this.out = out; + + } + + @Override + public void processRow(ResultSet rs) throws SQLException { + + BulletinBoardMessage result; + + result = BulletinBoardMessage.newBuilder() + .setEntryNum(rs.getLong(1)) + .setMsg(UnsignedBulletinBoardMessage.newBuilder() + .setData(ByteString.copyFrom(rs.getBytes(2))) + .setTimestamp(BulletinBoardUtils.toTimestampProto(rs.getTimestamp(3))) + .build()) + .build(); + + try { + + out.writeMessage(result); + + } catch (IOException e) { + + //TODO: log + e.printStackTrace(); + + } + + } + +} diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java index 2e4782f..7c0f7fa 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java @@ -22,7 +22,7 @@ import static meerkat.rest.Constants.*; import java.io.IOException; import java.io.OutputStream; -import java.util.List; +import java.util.Collection; /** * An implementation of the BulletinBoardServer which functions as a WebApp @@ -183,6 +183,21 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL } } + @Path(GENERATE_SYNC_QUERY_PATH) + @POST + @Consumes(MEDIATYPE_PROTOBUF) + @Produces(MEDIATYPE_PROTOBUF) + @Override + public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException { + try { + init(); + return bulletinBoard.generateSyncQuery(generateSyncQueryParams); + } catch (CommunicationException | IllegalArgumentException e) { + System.err.println(e.getMessage()); + return null; + } + } + @Path(READ_BATCH_PATH) @POST @Consumes(MEDIATYPE_PROTOBUF) diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java index c6e330c..d74c1ea 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java @@ -11,10 +11,6 @@ import java.util.List; */ public interface AsyncBulletinBoardClient extends BulletinBoardClient { - public interface MessageHandler { - void handleNewMessages(List messageList); - } - /** * Post a message to the bulletin board in an asynchronous manner * @param msg is the message to be posted @@ -100,11 +96,12 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient { */ public void readBatch(BatchSpecificationMessage batchSpecificationMessage, FutureCallback callback); + /** - * Subscribes to a notifier that will return any new messages on the server that match the given filters - * @param filterList defines the set of filters for message retrieval - * @param messageHandler defines the handler for new messages received + * Perform a Sync Query on the bulletin board + * @param syncQuery defines the query + * @param callback is a callback for handling the result of the query */ - public void subscribe(MessageFilterList filterList, MessageHandler messageHandler); + public void querySync(SyncQuery syncQuery, FutureCallback callback); } diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java index 2f5a3df..245eddf 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java @@ -5,6 +5,7 @@ import meerkat.protobuf.Voting.*; import static meerkat.protobuf.BulletinBoardAPI.*; +import java.util.Collection; import java.util.List; /** @@ -26,8 +27,6 @@ public interface BulletinBoardClient { */ MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException; - - /** * Check how "safe" a given message is in a synchronous manner * @param id is the unique message identifier for retrieval @@ -40,11 +39,21 @@ public interface BulletinBoardClient { * Note that if messages haven't been "fully posted", this might return a different * set of messages in different calls. However, messages that are fully posted * are guaranteed to be included. - * @param filterList return only messages that match the filters (null means no filtering). + * @param filterList return only messages that match the filters (null means no filtering) * @return the list of messages */ List readMessages(MessageFilterList filterList); + /** + * Create a SyncQuery to test against that corresponds with the current server state for a specific filter list + * Should only be called on instances for which the actual server contacted is known (i.e. there is only one server) + * @param GenerateSyncQueryParams defines the required information needed to generate the query + * These are represented as fractions of the total number of relevant messages + * @return The generated SyncQuery + * @throws CommunicationException when no DB can be contacted + */ + SyncQuery generateSyncQuery(GenerateSyncQueryParams GenerateSyncQueryParams) throws CommunicationException; + /** * Closes all connections, if any. * This is done in a synchronous (blocking) way. diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardConstants.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardConstants.java index 66652f8..9ddee90 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardConstants.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardConstants.java @@ -8,6 +8,7 @@ public interface BulletinBoardConstants { // Relative addresses for Bulletin Board operations public static final String BULLETIN_BOARD_SERVER_PATH = "/bbserver"; + public static final String GENERATE_SYNC_QUERY_PATH = "/generatesyncquery"; public static final String READ_MESSAGES_PATH = "/readmessages"; public static final String READ_BATCH_PATH = "/readbatch"; public static final String POST_MESSAGE_PATH = "/postmessage"; diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java index 0b279c2..e458dbc 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java @@ -4,6 +4,8 @@ import meerkat.comm.CommunicationException; import meerkat.comm.MessageOutputStream; import meerkat.protobuf.BulletinBoardAPI.*; +import java.util.Collection; + /** * Created by Arbel on 07/11/15. @@ -28,7 +30,7 @@ public interface BulletinBoardServer{ * @throws CommunicationException on DB connection error */ public BoolMsg postMessage(BulletinBoardMessage msg) throws CommunicationException; - + /** * Read all messages posted matching the given filter * @param filterList return only messages that match the filters (empty list or null means no filtering) @@ -77,6 +79,13 @@ public interface BulletinBoardServer{ */ public void readBatch(BatchSpecificationMessage message, MessageOutputStream out) throws CommunicationException, IllegalArgumentException; + /** + * Create a SyncQuery to test against that corresponds with the current server state for a specific filter list + * @param generateSyncQueryParams defines the information needed to generate the query + * @return The generated SyncQuery + * @throws CommunicationException on DB connection error + */ + SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException; /** * Queries the database for sync status with respect to a given sync query diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSubscriber.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSubscriber.java new file mode 100644 index 0000000..85eb2cc --- /dev/null +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSubscriber.java @@ -0,0 +1,32 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.FutureCallback; +import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage; +import meerkat.protobuf.BulletinBoardAPI.MessageFilterList; + +import java.util.List; + +/** + * Created by Arbel Deutsch Peled on 03-Mar-16. + * This interface defines the behaviour required from a subscription service to Bulletin Board messages + */ +public interface BulletinBoardSubscriber { + + /** + * Subscribes to a notifier that will return any new messages on the server that match the given filters + * In case of communication error: the subscription is terminated + * @param filterList defines the set of filters for message retrieval + * @param callback defines how to handle new messages received and/or a failures in communication + */ + public void subscribe(MessageFilterList filterList, FutureCallback> callback); + + /** + * Subscribes to a notifier that will return any new messages on the server that match the given filters + * In case of communication error: the subscription is terminated + * @param filterList defines the set of filters for message retrieval + * @param startEntry defines the first entry number to consider + * @param callback defines how to handle new messages received and/or a failures in communication + */ + public void subscribe(MessageFilterList filterList, long startEntry, FutureCallback> callback); + +} diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSynchronizer.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSynchronizer.java new file mode 100644 index 0000000..4b07225 --- /dev/null +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSynchronizer.java @@ -0,0 +1,22 @@ +package meerkat.bulletinboard; + +/** + * Created by Arbel Deutsch Peled on 08-Mar-16. + * This interface defines the behaviour of a bulletin board synchronizer + * This is used to make sure that data in a specific instance of a bulletin board server is duplicated to a sufficient percentage of the other servers + */ +public interface BulletinBoardSynchronizer extends Runnable{ + + /** + * + * @param localClient is a client for the local DB instance + * @param remoteClient is a client for the remote DBs + * @param minRedundancy + */ + public void init(BulletinBoardClient localClient, AsyncBulletinBoardClient remoteClient, float minRedundancy); + + @Override + public void run(); + + +} diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/CompleteBatch.java b/meerkat-common/src/main/java/meerkat/bulletinboard/CompleteBatch.java index 14e87e7..649fd8b 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/CompleteBatch.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/CompleteBatch.java @@ -139,7 +139,12 @@ public class CompleteBatch { @Override public String toString() { + + if (beginBatchMessage == null || beginBatchMessage.getSignerId() == null) + return "Unspecified batch " + super.toString(); + return "Batch " + beginBatchMessage.getSignerId().toString() + ":" + beginBatchMessage.getBatchId(); + } } diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/SubscriptionAsyncBulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/SubscriptionAsyncBulletinBoardClient.java new file mode 100644 index 0000000..b07e655 --- /dev/null +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/SubscriptionAsyncBulletinBoardClient.java @@ -0,0 +1,7 @@ +package meerkat.bulletinboard; + +/** + * Created by Arbel Deutsch Peled on 03-Mar-16. + */ +public interface SubscriptionAsyncBulletinBoardClient extends AsyncBulletinBoardClient, BulletinBoardSubscriber { +} diff --git a/meerkat-common/src/main/java/meerkat/util/BulletinBoardMessageGenerator.java b/meerkat-common/src/main/java/meerkat/util/BulletinBoardMessageGenerator.java index 5ca3e0b..dff562e 100644 --- a/meerkat-common/src/main/java/meerkat/util/BulletinBoardMessageGenerator.java +++ b/meerkat-common/src/main/java/meerkat/util/BulletinBoardMessageGenerator.java @@ -8,6 +8,8 @@ import com.google.protobuf.Timestamp; import java.math.BigInteger; import java.security.SignatureException; import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; import java.util.Random; /** @@ -36,35 +38,36 @@ public class BulletinBoardMessageGenerator { * @param timestamp contains the time used in the message * @param dataSize is the length of the data contained in the message * @param tagNumber is the number of tags to generate + * @param tags is a list of initial tags (on top of which more will be added according to the method input) * @return a random, signed Bulletin Board Message containing random data and tags and the given timestamp */ - - public BulletinBoardMessage generateRandomMessage(DigitalSignature[] signers, Timestamp timestamp, int dataSize, int tagNumber) - throws SignatureException { + public BulletinBoardMessage generateRandomMessage(DigitalSignature[] signers, Timestamp timestamp, int dataSize, int tagNumber, List tags) + throws SignatureException{ // Generate random data. byte[] data = new byte[dataSize]; - String[] tags = new String[tagNumber]; + String[] newTags = new String[tagNumber]; for (int i = 0; i < dataSize; i++) { data[i] = randomByte(); } for (int i = 0; i < tagNumber; i++) { - tags[i] = randomString(); + newTags[i] = randomString(); } UnsignedBulletinBoardMessage unsignedMessage = UnsignedBulletinBoardMessage.newBuilder() .setData(ByteString.copyFrom(data)) .setTimestamp(timestamp) - .addAllTag(Arrays.asList(tags)) + .addAllTag(tags) + .addAllTag(Arrays.asList(newTags)) .build(); BulletinBoardMessage.Builder messageBuilder = BulletinBoardMessage.newBuilder() - .setMsg(unsignedMessage); + .setMsg(unsignedMessage); for (int i = 0 ; i < signers.length ; i++) { signers[i].updateContent(unsignedMessage); @@ -75,6 +78,23 @@ public class BulletinBoardMessageGenerator { } + /** + * Generates a complete instance of a BulletinBoardMessage + * @param signers contains the (possibly multiple) credentials required to sign the message + * @param timestamp contains the time used in the message + * @param dataSize is the length of the data contained in the message + * @param tagNumber is the number of tags to generate + * @return a random, signed Bulletin Board Message containing random data and tags and the given timestamp + */ + + public BulletinBoardMessage generateRandomMessage(DigitalSignature[] signers, Timestamp timestamp, int dataSize, int tagNumber) + throws SignatureException { + + List tags = new LinkedList<>(); + return generateRandomMessage(signers, timestamp, dataSize, tagNumber, tags); + + } + /** * Generates a complete instance of a BulletinBoardMessage * @param signers contains the (possibly multiple) credentials required to sign the message diff --git a/meerkat-common/src/main/proto/meerkat/BulletinBoardAPI.proto b/meerkat-common/src/main/proto/meerkat/BulletinBoardAPI.proto index fd95503..1136e16 100644 --- a/meerkat-common/src/main/proto/meerkat/BulletinBoardAPI.proto +++ b/meerkat-common/src/main/proto/meerkat/BulletinBoardAPI.proto @@ -146,6 +146,18 @@ message SyncQuery { } +// This message defines the required information for generation of a SyncQuery instance by the server +message GenerateSyncQueryParams { + + // Defines the set of messages required + MessageFilterList filterList = 1; + + // Defines the locations in the list of messages to calculate single sync queries for + // The values should be between 0.0 and 1.0 and define the location in fractions of the size of the message set + repeated float breakpointList = 2; + +} + // This message defines the server's response format to a sync query message SyncQueryResponse {