From 53d609bfee483c348f6d35396c817fa6aecc95d1 Mon Sep 17 00:00:00 2001 From: "arbel.peled" Date: Tue, 28 Jun 2016 08:19:29 +0300 Subject: [PATCH] Removed database address from Bulletin Board Server init method (the address is given to the Query Provider). Added integration tests for Single Server Bulletin Board Client. Fixed subscription logic in the Single Server Bulletin Board Client. Decoupled Single Server- and Simple- Bulletin Board Clients (Simple-... is now obsolete). Fixed some bugs. Threaded Bulletin Board Client now has some errors in integration. --- .../CachedBulletinBoardClient.java | 45 ++- .../SimpleBulletinBoardClient.java | 13 +- .../SingleServerBulletinBoardClient.java | 284 ++++++++++++++++-- .../ThreadedBulletinBoardSubscriber.java | 2 - .../SingleServerGenerateSyncQueryWorker.java | 55 ++++ .../BulletinBoardSynchronizerTest.java | 4 +- .../CachedBulletinBoardClientTest.java | 111 +++++++ .../GenericSubscriptionClientTester.java | 8 +- .../LocalBulletinBoardClientTest.java | 27 +- ...verBulletinBoardClientIntegrationTest.java | 104 +++++++ .../sqlserver/BulletinBoardSQLServer.java | 2 +- .../webapp/BulletinBoardWebApp.java | 10 +- .../H2BulletinBoardServerTest.java | 2 +- .../MySQLBulletinBoardServerTest.java | 2 +- .../SQLiteBulletinBoardServerTest.java | 2 +- .../bulletinboard/BulletinBoardClient.java | 3 +- .../bulletinboard/BulletinBoardServer.java | 2 +- 17 files changed, 575 insertions(+), 101 deletions(-) create mode 100644 bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGenerateSyncQueryWorker.java create mode 100644 bulletin-board-client/src/test/java/meerkat/bulletinboard/CachedBulletinBoardClientTest.java create mode 100644 bulletin-board-client/src/test/java/meerkat/bulletinboard/SingleServerBulletinBoardClientIntegrationTest.java diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java index 6594b0c..cdb86cb 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java @@ -23,12 +23,16 @@ import java.util.List; public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClient { private final AsyncBulletinBoardClient localClient; - private AsyncBulletinBoardClient remoteClient; - private BulletinBoardSubscriber subscriber; - private BulletinBoardSynchronizer synchronizer; + private final AsyncBulletinBoardClient remoteClient; + private final AsyncBulletinBoardClient queueClient; + private final BulletinBoardSubscriber subscriber; + private final BulletinBoardSynchronizer synchronizer; private Thread syncThread; + private final static int DEFAULT_WAIT_CAP = 3000; + private final static int DEFAULT_SLEEP_INTERVAL = 3000; + private class SubscriptionStoreCallback implements FutureCallback> { private final FutureCallback> callback; @@ -81,21 +85,36 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien * @param localClient is a Client for the local instance * @param remoteClient is a Client for the remote instance(s); Should have endless retries for post operations * @param subscriber is a subscription service to the remote instance(s) - * @param queue is a client for a local deletable server to be used as a queue for not-yet-uploaded messages + * @param queueClient is a client for a local deletable server to be used as a queue for not-yet-uploaded messages */ + public CachedBulletinBoardClient(AsyncBulletinBoardClient localClient, + AsyncBulletinBoardClient remoteClient, + BulletinBoardSubscriber subscriber, + DeletableSubscriptionBulletinBoardClient queueClient, + int sleepInterval, + int waitCap) { + + this.localClient = localClient; + this.remoteClient = remoteClient; + this.subscriber = subscriber; + this.queueClient = queueClient; + + this.synchronizer = new SimpleBulletinBoardSynchronizer(sleepInterval,waitCap); + synchronizer.init(queueClient, remoteClient); + syncThread = new Thread(synchronizer); + syncThread.start(); + } + + /** + * Creates a Cached Client + * Used default values foe the time caps + * */ public CachedBulletinBoardClient(AsyncBulletinBoardClient localClient, AsyncBulletinBoardClient remoteClient, BulletinBoardSubscriber subscriber, DeletableSubscriptionBulletinBoardClient queue) { - this.localClient = localClient; - this.remoteClient = remoteClient; - this.subscriber = subscriber; - - this.synchronizer = new SimpleBulletinBoardSynchronizer(); - synchronizer.init(queue, remoteClient); - syncThread = new Thread(synchronizer); - syncThread.start(); + this(localClient, remoteClient, subscriber, queue, DEFAULT_SLEEP_INTERVAL, DEFAULT_WAIT_CAP); } @Override @@ -385,7 +404,7 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien } @Override - public float getRedundancy(MessageID id) { + public float getRedundancy(MessageID id) throws CommunicationException { return remoteClient.getRedundancy(id); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java index 521815d..a274f53 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java @@ -68,16 +68,11 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{ // Post message to all databases try { for (String db : meerkatDBs) { - webTarget = client.target(db).path(BULLETIN_BOARD_SERVER_PATH).path(POST_MESSAGE_PATH); - response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(msg, Constants.MEDIATYPE_PROTOBUF)); - // Only consider valid responses - if (response.getStatusInfo() == Response.Status.OK - || response.getStatusInfo() == Response.Status.CREATED) { - response.readEntity(BoolValue.class).getValue(); - } else { - throw new CommunicationException("Server returned error. Status was: " + response.getStatus()); - } + SingleServerPostMessageWorker worker = new SingleServerPostMessageWorker(db, msg, 0); + + worker.call(); + } } catch (Exception e) { // Occurs only when server replies with valid status but invalid data throw new CommunicationException("Error accessing database: " + e.getMessage()); diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java index 86ede70..be75dfe 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java @@ -8,15 +8,15 @@ import com.google.protobuf.Int64Value; import com.google.protobuf.Timestamp; import meerkat.bulletinboard.workers.singleserver.*; import meerkat.comm.CommunicationException; -import meerkat.protobuf.BulletinBoardAPI; +import meerkat.crypto.concrete.SHA256Digest; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Crypto; import meerkat.protobuf.Voting.BulletinBoardClientParams; import meerkat.util.BulletinBoardUtils; +import javax.ws.rs.client.Client; import java.lang.Iterable; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -31,11 +31,17 @@ import java.util.concurrent.atomic.AtomicInteger; * If the list of servers contains more than one server: the server actually used is the first one * The class further implements a delayed access to the server after a communication error occurs */ -public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient implements SubscriptionBulletinBoardClient { +public class SingleServerBulletinBoardClient implements SubscriptionBulletinBoardClient { + + protected Client client; + + protected BulletinBoardDigest digest; + + private String dbAddress; private final int MAX_RETRIES = 11; - private ListeningScheduledExecutorService executorService; + private final ListeningScheduledExecutorService executorService; private long lastServerErrorTime; @@ -54,6 +60,43 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } + private class SynchronousRetry { + + private final SingleServerWorker worker; + + private String thrown; + + public SynchronousRetry(SingleServerWorker worker) { + this.worker = worker; + this.thrown = "Could not contact server. Errors follow:\n"; + } + + OUT run() throws CommunicationException { + + do { + + try { + return worker.call(); + } catch (Exception e) { + thrown += e.getCause() + " " + e.getMessage() + "\n"; + } + + try { + Thread.sleep(failDelayInMilliseconds); + } catch (InterruptedException e) { + //TODO: log + } + + worker.decMaxRetry(); + + } while (worker.isRetry()); + + throw new CommunicationException(thrown); + + } + + } + /** * This method adds a worker to the scheduled queue of the threadpool * If the server is in an accessible state: the job is submitted for immediate handling @@ -225,7 +268,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i .build()) .build(); - SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(meerkatDBs.get(0), batchQuery, MAX_RETRIES); + SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(dbAddress, batchQuery, MAX_RETRIES); scheduleWorker(batchWorker, new ReadBatchCallback(msg, callback)); @@ -266,20 +309,28 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i if (callback != null) callback.onSuccess(result); - // Remove last filter from list (MIN_ENTRY one) - filterBuilder.removeFilter(filterBuilder.getFilterCount() - 1); + // Update filter if needed - // Add updated MIN_ENTRY filter (entry number is successor of last received entry's number) - filterBuilder.addFilter(MessageFilter.newBuilder() - .setType(FilterType.MIN_ENTRY) - .setEntry(result.get(result.size() - 1).getEntryNum() + 1) - .build()); + if (result.size() > 0) { + + // Remove last filter from list (MIN_ENTRY one) + filterBuilder.removeFilter(filterBuilder.getFilterCount() - 1); + + // Add updated MIN_ENTRY filter (entry number is successor of last received entry's number) + filterBuilder.addFilter(MessageFilter.newBuilder() + .setType(FilterType.MIN_ENTRY) + .setEntry(result.get(result.size() - 1).getEntryNum() + 1) + .build()); + + } // Create new worker with updated task - worker = new SingleServerReadMessagesWorker(worker.serverAddress, filterBuilder.build(), 1); + worker = new SingleServerReadMessagesWorker(worker.serverAddress, filterBuilder.build(), MAX_RETRIES); - // Schedule the worker - scheduleWorker(worker, this); + RetryCallback> retryCallback = new RetryCallback<>(worker, this); + + // Schedule the worker to run after the given interval has elapsed + Futures.addCallback(executorService.schedule(worker, subscriptionIntervalInMilliseconds, TimeUnit.MILLISECONDS), retryCallback); } @@ -324,21 +375,192 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i @Override public void init(BulletinBoardClientParams clientParams) { - // Perform usual setup - super.init(clientParams); + this.digest = new GenericBulletinBoardDigest(new SHA256Digest()); // Remove all but first DB address - String dbAddress = meerkatDBs.get(0); - meerkatDBs = new LinkedList<>(); - meerkatDBs.add(dbAddress); + this.dbAddress = clientParams.getBulletinBoardAddress(0); } + // Synchronous methods + + @Override + public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException { + + SingleServerPostMessageWorker worker = new SingleServerPostMessageWorker(dbAddress, msg, MAX_RETRIES); + + SynchronousRetry retry = new SynchronousRetry<>(worker); + + retry.run(); + + digest.reset(); + digest.update(msg); + + return digest.digestAsMessageID(); + + } + + @Override + public float getRedundancy(MessageID id) throws CommunicationException { + + SingleServerGetRedundancyWorker worker = new SingleServerGetRedundancyWorker(dbAddress, id, MAX_RETRIES); + + SynchronousRetry retry = new SynchronousRetry<>(worker); + + return retry.run(); + + } + + @Override + public List readMessages(MessageFilterList filterList) throws CommunicationException { + + SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(dbAddress, filterList, MAX_RETRIES); + + SynchronousRetry> retry = new SynchronousRetry<>(worker); + + return retry.run(); + + } + + @Override + public MessageID postAsBatch(BulletinBoardMessage msg, int chunkSize) throws CommunicationException { + + // Begin the batch and obtain identifier + + BeginBatchMessage beginBatchMessage = BeginBatchMessage.newBuilder() + .addAllTag(msg.getMsg().getTagList()) + .build(); + + SingleServerBeginBatchWorker beginBatchWorker = new SingleServerBeginBatchWorker(dbAddress, beginBatchMessage, MAX_RETRIES); + + SynchronousRetry beginRetry = new SynchronousRetry<>(beginBatchWorker); + + Int64Value identifier = beginRetry.run(); + + // Post data chunks + + List batchChunkList = BulletinBoardUtils.breakToBatch(msg, chunkSize); + + BatchMessage.Builder builder = BatchMessage.newBuilder().setBatchId(identifier.getValue()); + + int position = 0; + + for (BatchChunk data : batchChunkList) { + + builder.setSerialNum(position).setData(data); + + SingleServerPostBatchWorker dataWorker = new SingleServerPostBatchWorker(dbAddress, builder.build(), MAX_RETRIES); + + SynchronousRetry dataRetry = new SynchronousRetry<>(dataWorker); + + dataRetry.run(); + + // Increment position in batch + position++; + + } + + // Close batch + + CloseBatchMessage closeBatchMessage = CloseBatchMessage.newBuilder() + .setBatchId(identifier.getValue()) + .addAllSig(msg.getSigList()) + .setTimestamp(msg.getMsg().getTimestamp()) + .setBatchLength(position) + .build(); + + SingleServerCloseBatchWorker closeBatchWorker = new SingleServerCloseBatchWorker(dbAddress, closeBatchMessage, MAX_RETRIES); + + SynchronousRetry retry = new SynchronousRetry<>(closeBatchWorker); + + retry.run(); + + // Calculate ID and return + + digest.reset(); + digest.update(msg); + + return digest.digestAsMessageID(); + + } + + @Override + public BulletinBoardMessage readMessage(MessageID msgID) throws CommunicationException { + + // Retrieve message (which may be a stub) + + MessageFilterList filterList = MessageFilterList.newBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.MSG_ID) + .setId(msgID.getID()) + .build()) + .build(); + + SingleServerReadMessagesWorker stubWorker = new SingleServerReadMessagesWorker(dbAddress, filterList, MAX_RETRIES); + + SynchronousRetry> retry = new SynchronousRetry<>(stubWorker); + + List messages = retry.run(); + + if (messages.size() <= 0) { + throw new CommunicationException("Could not find message in database."); + } + + BulletinBoardMessage msg = messages.get(0); + + if (msg.getMsg().getDataTypeCase() != UnsignedBulletinBoardMessage.DataTypeCase.MSGID) { + + // We retrieved a complete message. Return it. + return msg; + + } else { + + // We retrieved a stub. Retrieve data. + return readBatchData(msg); + + } + + } + + @Override + public BulletinBoardMessage readBatchData(BulletinBoardMessage stub) throws CommunicationException, IllegalArgumentException { + + BatchQuery batchQuery = BatchQuery.newBuilder() + .setMsgID(MessageID.newBuilder() + .setID(stub.getMsg().getMsgId()) + .build()) + .setStartPosition(0) + .build(); + + SingleServerReadBatchWorker readBatchWorker = new SingleServerReadBatchWorker(dbAddress, batchQuery, MAX_RETRIES); + + SynchronousRetry> batchRetry = new SynchronousRetry<>(readBatchWorker); + + List batchChunkList = batchRetry.run(); + + return BulletinBoardUtils.gatherBatch(stub, batchChunkList); + + } + + @Override + public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException { + + SingleServerGenerateSyncQueryWorker worker = + new SingleServerGenerateSyncQueryWorker(dbAddress, generateSyncQueryParams, MAX_RETRIES); + + SynchronousRetry retry = new SynchronousRetry<>(worker); + + return retry.run(); + + } + + // Asynchronous methods + @Override public MessageID postMessage(BulletinBoardMessage msg, FutureCallback callback) { // Create worker with redundancy 1 and MAX_RETRIES retries - SingleServerPostMessageWorker worker = new SingleServerPostMessageWorker(meerkatDBs.get(0), msg, MAX_RETRIES); + SingleServerPostMessageWorker worker = new SingleServerPostMessageWorker(dbAddress, msg, MAX_RETRIES); // Submit worker and create callback scheduleWorker(worker, new RetryCallback<>(worker, callback)); @@ -453,7 +675,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i // Create worker with redundancy 1 and MAX_RETRIES retries SingleServerBeginBatchWorker worker = - new SingleServerBeginBatchWorker(meerkatDBs.get(0), beginBatchMessage, MAX_RETRIES); + new SingleServerBeginBatchWorker(dbAddress, beginBatchMessage, MAX_RETRIES); // Submit worker and create callback scheduleWorker(worker, new RetryCallback<>(worker, new BeginBatchCallback(callback))); @@ -491,7 +713,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i // Create worker with redundancy 1 and MAX_RETRIES retries SingleServerPostBatchWorker worker = - new SingleServerPostBatchWorker(meerkatDBs.get(0), builder.build(), MAX_RETRIES); + new SingleServerPostBatchWorker(dbAddress, builder.build(), MAX_RETRIES); // Create worker with redundancy 1 and MAX_RETRIES retries scheduleWorker(worker, new RetryCallback<>(worker, listCallback)); @@ -529,7 +751,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i // Create worker with redundancy 1 and MAX_RETRIES retries SingleServerCloseBatchWorker worker = - new SingleServerCloseBatchWorker(meerkatDBs.get(0), closeBatchMessage, MAX_RETRIES); + new SingleServerCloseBatchWorker(dbAddress, closeBatchMessage, MAX_RETRIES); // Submit worker and create callback scheduleWorker(worker, new RetryCallback<>(worker, callback)); @@ -540,7 +762,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i public void getRedundancy(MessageID id, FutureCallback callback) { // Create worker with no retries - SingleServerGetRedundancyWorker worker = new SingleServerGetRedundancyWorker(meerkatDBs.get(0), id, 1); + SingleServerGetRedundancyWorker worker = new SingleServerGetRedundancyWorker(dbAddress, id, 1); // Submit job and create callback scheduleWorker(worker, new RetryCallback<>(worker, callback)); @@ -551,7 +773,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i public void readMessages(MessageFilterList filterList, FutureCallback> callback) { // Create job with no retries - SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterList, 1); + SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(dbAddress, filterList, 1); // Submit job and create callback scheduleWorker(worker, new RetryCallback<>(worker, callback)); @@ -575,7 +797,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i .setStartPosition(0) .build(); - SingleServerReadMessagesWorker messageWorker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterList, MAX_RETRIES); + SingleServerReadMessagesWorker messageWorker = new SingleServerReadMessagesWorker(dbAddress, filterList, MAX_RETRIES); // Submit jobs with wrapped callbacks scheduleWorker(messageWorker, new RetryCallback<>(messageWorker, new CompleteMessageReadCallback(callback))); @@ -598,7 +820,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i .setStartPosition(0) .build(); - SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(meerkatDBs.get(0), batchQuery, MAX_RETRIES); + SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(dbAddress, batchQuery, MAX_RETRIES); scheduleWorker(batchWorker, new RetryCallback<>(batchWorker, new ReadBatchCallback(stub, callback))); @@ -607,7 +829,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i @Override public void querySync(SyncQuery syncQuery, FutureCallback callback) { - SingleServerQuerySyncWorker worker = new SingleServerQuerySyncWorker(meerkatDBs.get(0), syncQuery, MAX_RETRIES); + SingleServerQuerySyncWorker worker = new SingleServerQuerySyncWorker(dbAddress, syncQuery, MAX_RETRIES); scheduleWorker(worker, new RetryCallback<>(worker, callback)); @@ -633,7 +855,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i .build()); // Create job with no retries - SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterListBuilder.build(), MAX_RETRIES); + SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(dbAddress, filterListBuilder.build(), MAX_RETRIES); // Submit job and create callback that retries on failure and handles repeated subscription scheduleWorker(worker, new RetryCallback<>(worker, new SubscriptionCallback(worker, callback))); @@ -647,8 +869,6 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i @Override public void close() { - super.close(); - executorService.shutdown(); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java index 9568255..252d6e3 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java @@ -250,7 +250,6 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber super(filterList, callback); } - @Override public void onSuccess(List result) { @@ -274,5 +273,4 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber subscribe(filterList, 0, callback); } - } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGenerateSyncQueryWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGenerateSyncQueryWorker.java new file mode 100644 index 0000000..71d90e0 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGenerateSyncQueryWorker.java @@ -0,0 +1,55 @@ +package meerkat.bulletinboard.workers.singleserver; + +import com.google.protobuf.Int64Value; +import meerkat.bulletinboard.SingleServerWorker; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI.SyncQuery; +import meerkat.protobuf.BulletinBoardAPI.GenerateSyncQueryParams; +import meerkat.rest.Constants; + +import javax.ws.rs.ProcessingException; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; + +import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH; +import static meerkat.bulletinboard.BulletinBoardConstants.GENERATE_SYNC_QUERY_PATH; + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + * Tries to contact server once and perform a Sync Query Generation operation + */ +public class SingleServerGenerateSyncQueryWorker extends SingleServerWorker { + + public SingleServerGenerateSyncQueryWorker(String serverAddress, GenerateSyncQueryParams payload, int maxRetry) { + super(serverAddress, payload, maxRetry); + } + + @Override + public SyncQuery call() throws Exception { + + Client client = clientLocal.get(); + + WebTarget webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(GENERATE_SYNC_QUERY_PATH); + + Response response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(payload, Constants.MEDIATYPE_PROTOBUF)); + + try { + + SyncQuery result = response.readEntity(SyncQuery.class); + return result; + + } catch (ProcessingException | IllegalStateException e) { + + // Post to this server failed + throw new CommunicationException("Could not contact the server. Original error: " + e.getMessage()); + + } catch (Exception e) { + throw new CommunicationException("Could not contact the server. Original error: " + e.getMessage()); + } + finally { + response.close(); + } + } +} diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java index 588f529..19e7ae5 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java @@ -114,7 +114,7 @@ public class BulletinBoardSynchronizerTest { public void init() throws CommunicationException { DeletableBulletinBoardServer remoteServer = new BulletinBoardSQLServer(new H2QueryProvider(REMOTE_SERVER_ADDRESS + testCount)); - remoteServer.init(REMOTE_SERVER_ADDRESS); + remoteServer.init(); remoteClient = new LocalBulletinBoardClient( remoteServer, @@ -122,7 +122,7 @@ public class BulletinBoardSynchronizerTest { SUBSCRIPTION_INTERVAL); DeletableBulletinBoardServer localServer = new BulletinBoardSQLServer(new H2QueryProvider(LOCAL_SERVER_ADDRESS + testCount)); - localServer.init(LOCAL_SERVER_ADDRESS); + localServer.init(); localClient = new LocalBulletinBoardClient( localServer, diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/CachedBulletinBoardClientTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/CachedBulletinBoardClientTest.java new file mode 100644 index 0000000..83e62ff --- /dev/null +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/CachedBulletinBoardClientTest.java @@ -0,0 +1,111 @@ +package meerkat.bulletinboard; + +import meerkat.bulletinboard.sqlserver.BulletinBoardSQLServer; +import meerkat.bulletinboard.sqlserver.H2QueryProvider; +import meerkat.comm.CommunicationException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.security.SignatureException; +import java.util.LinkedList; +import java.util.List; + +/** + * Created by Arbel on 6/27/2016. + */ +public class CachedBulletinBoardClientTest { + + private static final int THREAD_NUM = 3; + + private static final String LOCAL_DB_NAME = "localDB"; + private static final String REMOTE_DB_NAME = "remoteDB"; + private static final String QUEUE_DB_NAME = "queueDB"; + + private static final int SUBSRCIPTION_DELAY = 500; + private static final int SYNC_DELAY = 500; + + // Testers + private CachedBulletinBoardClient cachedClient; + private GenericBulletinBoardClientTester clientTest; + private GenericSubscriptionClientTester subscriptionTester; + + public CachedBulletinBoardClientTest() throws CommunicationException { + + DeletableBulletinBoardServer localServer = new BulletinBoardSQLServer(new H2QueryProvider(LOCAL_DB_NAME)); + localServer.init(); + LocalBulletinBoardClient localClient = new LocalBulletinBoardClient(localServer, THREAD_NUM, SUBSRCIPTION_DELAY); + + DeletableBulletinBoardServer remoteServer = new BulletinBoardSQLServer(new H2QueryProvider(REMOTE_DB_NAME)); + remoteServer.init(); + LocalBulletinBoardClient remoteClient = new LocalBulletinBoardClient(remoteServer, THREAD_NUM, SUBSRCIPTION_DELAY); + + DeletableBulletinBoardServer queueServer = new BulletinBoardSQLServer(new H2QueryProvider(QUEUE_DB_NAME)); + queueServer.init(); + LocalBulletinBoardClient queueClient = new LocalBulletinBoardClient(queueServer, THREAD_NUM, SUBSRCIPTION_DELAY); + + List clientList = new LinkedList<>(); + clientList.add(remoteClient); + + BulletinBoardSubscriber subscriber = new ThreadedBulletinBoardSubscriber(clientList, localClient); + + cachedClient = new CachedBulletinBoardClient(localClient, remoteClient, subscriber, queueClient, SYNC_DELAY, SYNC_DELAY); + subscriptionTester = new GenericSubscriptionClientTester(cachedClient); + clientTest = new GenericBulletinBoardClientTester(cachedClient); + + } + + // Test methods + + /** + * Takes care of initializing the client and the test resources + */ + @Before + public void init(){ + + clientTest.init(); + + } + + /** + * Closes the client and makes sure the test fails when an exception occurred in a separate thread + */ + + @After + public void close() { + + cachedClient.close(); + clientTest.close(); + + } + + @Test + public void testPost() { + + clientTest.testPost(); + + } + + @Test + public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + clientTest.testBatchPost(); + } + + @Test + public void testCompleteBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + clientTest.testCompleteBatchPost(); + + } + + @Test + public void testSubscription() throws SignatureException, CommunicationException { + +// subscriptionTester.init(); +// subscriptionTester.subscriptionTest(); +// subscriptionTester.close(); + + } + +} diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java index ea74885..6d02914 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java @@ -178,7 +178,7 @@ public class GenericSubscriptionClientTester { public void onFailure(Throwable t) { System.err.println(t.getCause() + " " + t.getMessage()); thrown.add(t); - jobSemaphore.release(expectedMessages.size()); + jobSemaphore.release(); stage = expectedMessages.size(); } } @@ -202,9 +202,9 @@ public class GenericSubscriptionClientTester { .build(); List> expectedMessages = new ArrayList<>(3); - expectedMessages.add(new LinkedList()); - expectedMessages.add(new LinkedList()); - expectedMessages.add(new LinkedList()); + expectedMessages.add(new LinkedList<>()); + expectedMessages.add(new LinkedList<>()); + expectedMessages.add(new LinkedList<>()); expectedMessages.get(0).add(msg1); expectedMessages.get(2).add(msg3); diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java index 56392d7..9ee1d60 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java @@ -7,12 +7,6 @@ import org.junit.Before; import org.junit.Test; import java.security.SignatureException; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.List; - -import static org.junit.Assert.fail; /** * Created by Arbel Deutsch Peled on 05-Dec-15. @@ -30,26 +24,10 @@ public class LocalBulletinBoardClientTest { public LocalBulletinBoardClientTest() throws CommunicationException { - H2QueryProvider queryProvider = new H2QueryProvider(DB_NAME) ; - - try { - - Connection conn = queryProvider.getDataSource().getConnection(); - Statement stmt = conn.createStatement(); - - List deletionQueries = queryProvider.getSchemaDeletionCommands(); - - for (String deletionQuery : deletionQueries) { - stmt.execute(deletionQuery); - } - - } catch (SQLException e) { - System.err.println(e.getMessage()); - throw new CommunicationException(e.getCause() + " " + e.getMessage()); - } + H2QueryProvider queryProvider = new H2QueryProvider(DB_NAME); DeletableBulletinBoardServer server = new BulletinBoardSQLServer(queryProvider); - server.init(DB_NAME); + server.init(); LocalBulletinBoardClient client = new LocalBulletinBoardClient(server, THREAD_NUM, SUBSRCIPTION_DELAY); subscriptionTester = new GenericSubscriptionClientTester(client); @@ -102,6 +80,7 @@ public class LocalBulletinBoardClientTest { @Test public void testSubscription() throws SignatureException, CommunicationException { + subscriptionTester.init(); subscriptionTester.subscriptionTest(); subscriptionTester.close(); diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/SingleServerBulletinBoardClientIntegrationTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/SingleServerBulletinBoardClientIntegrationTest.java new file mode 100644 index 0000000..424c4c5 --- /dev/null +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/SingleServerBulletinBoardClientIntegrationTest.java @@ -0,0 +1,104 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.Voting.BulletinBoardClientParams; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.security.SignatureException; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Executors; + +/** + * Created by Arbel Deutsch Peled on 05-Dec-15. + */ +public class SingleServerBulletinBoardClientIntegrationTest { + + // Server data + + private static final String PROP_GETTY_URL = "gretty.httpBaseURI"; + private static final String DEFAULT_BASE_URL = "http://localhost:8081"; + private static final String BASE_URL = System.getProperty(PROP_GETTY_URL, DEFAULT_BASE_URL); + + private static final int THREAD_NUM = 3; + private static final long FAIL_DELAY = 3000; + private static final long SUBSCRIPTION_INTERVAL = 500; + + // Testers + private GenericBulletinBoardClientTester clientTest; + private GenericSubscriptionClientTester subscriptionTester; + + public SingleServerBulletinBoardClientIntegrationTest(){ + + SingleServerBulletinBoardClient client = new SingleServerBulletinBoardClient(THREAD_NUM, FAIL_DELAY, SUBSCRIPTION_INTERVAL); + + List testDB = new LinkedList<>(); + testDB.add(BASE_URL); + + client.init(BulletinBoardClientParams.newBuilder() + .addAllBulletinBoardAddress(testDB) + .setMinRedundancy((float) 1.0) + .build()); + + clientTest = new GenericBulletinBoardClientTester(client); + subscriptionTester = new GenericSubscriptionClientTester(client); + + } + + // Test methods + + /** + * Takes care of initializing the client and the test resources + */ + @Before + public void init(){ + + clientTest.init(); + + } + + /** + * Closes the client and makes sure the test fails when an exception occurred in a separate thread + */ + + @After + public void close() { + + clientTest.close(); + + } + + @Test + public void testPost() { + + clientTest.testPost(); + + } + + @Test + public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + clientTest.testBatchPost(); + } + + @Test + public void testCompleteBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + clientTest.testCompleteBatchPost(); + + } + + @Test + public void testSubscription() throws SignatureException, CommunicationException { + + subscriptionTester.init(); + subscriptionTester.subscriptionTest(); + subscriptionTester.close(); + + } + +} diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java index e9c910a..bf99259 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java @@ -368,7 +368,7 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{ * This method initializes the signatures, connects to the DB and creates the schema (if required). */ @Override - public void init(String meerkatDB) throws CommunicationException { + public void init() throws CommunicationException { // TODO write signature reading part. digest = new GenericBulletinBoardDigest(new SHA256Digest()); diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java index f9d3a94..8e9d161 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java @@ -44,14 +44,6 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL bulletinBoard = (BulletinBoardServer) servletContext.getAttribute(BULLETIN_BOARD_ATTRIBUTE_NAME); } - /** - * This is the BulletinBoard init method. - */ - @Override - public void init(String meerkatDB) throws CommunicationException { - bulletinBoard.init(meerkatDB); - } - @Override public void contextInitialized(ServletContextEvent servletContextEvent) { ServletContext servletContext = servletContextEvent.getServletContext(); @@ -77,7 +69,7 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL } try { - init(dbName); + bulletinBoard.init(); servletContext.setAttribute(BULLETIN_BOARD_ATTRIBUTE_NAME, bulletinBoard); } catch (CommunicationException e) { System.err.println(e.getMessage()); diff --git a/bulletin-board-server/src/test/java/meerkat/bulletinboard/H2BulletinBoardServerTest.java b/bulletin-board-server/src/test/java/meerkat/bulletinboard/H2BulletinBoardServerTest.java index fa96635..def0b41 100644 --- a/bulletin-board-server/src/test/java/meerkat/bulletinboard/H2BulletinBoardServerTest.java +++ b/bulletin-board-server/src/test/java/meerkat/bulletinboard/H2BulletinBoardServerTest.java @@ -54,7 +54,7 @@ public class H2BulletinBoardServerTest { BulletinBoardServer bulletinBoardServer = new BulletinBoardSQLServer(queryProvider); try { - bulletinBoardServer.init(""); + bulletinBoardServer.init(); } catch (CommunicationException e) { System.err.println(e.getMessage()); diff --git a/bulletin-board-server/src/test/java/meerkat/bulletinboard/MySQLBulletinBoardServerTest.java b/bulletin-board-server/src/test/java/meerkat/bulletinboard/MySQLBulletinBoardServerTest.java index 334620c..abc0fc6 100644 --- a/bulletin-board-server/src/test/java/meerkat/bulletinboard/MySQLBulletinBoardServerTest.java +++ b/bulletin-board-server/src/test/java/meerkat/bulletinboard/MySQLBulletinBoardServerTest.java @@ -58,7 +58,7 @@ public class MySQLBulletinBoardServerTest { BulletinBoardServer bulletinBoardServer = new BulletinBoardSQLServer(queryProvider); try { - bulletinBoardServer.init(""); + bulletinBoardServer.init(); } catch (CommunicationException e) { System.err.println(e.getMessage()); diff --git a/bulletin-board-server/src/test/java/meerkat/bulletinboard/SQLiteBulletinBoardServerTest.java b/bulletin-board-server/src/test/java/meerkat/bulletinboard/SQLiteBulletinBoardServerTest.java index 4dcd97b..18278b3 100644 --- a/bulletin-board-server/src/test/java/meerkat/bulletinboard/SQLiteBulletinBoardServerTest.java +++ b/bulletin-board-server/src/test/java/meerkat/bulletinboard/SQLiteBulletinBoardServerTest.java @@ -39,7 +39,7 @@ public class SQLiteBulletinBoardServerTest{ BulletinBoardServer bulletinBoardServer = new BulletinBoardSQLServer(new SQLiteQueryProvider(testFilename)); try { - bulletinBoardServer.init(""); + bulletinBoardServer.init(); } catch (CommunicationException e) { System.err.println(e.getMessage()); diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java index 7f29608..c03051c 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java @@ -31,8 +31,9 @@ public interface BulletinBoardClient { * Check how "safe" a given message is in a synchronous manner * @param id is the unique message identifier for retrieval * @return a normalized "redundancy score" from 0 (local only) to 1 (fully published) + * @throws CommunicationException */ - float getRedundancy(MessageID id); + float getRedundancy(MessageID id) throws CommunicationException; /** * Read all messages posted matching the given filter in a synchronous manner diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java index 8121b85..40f8ab3 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java @@ -22,7 +22,7 @@ public interface BulletinBoardServer{ * It also establishes the connection to the DB * @throws CommunicationException on DB connection error */ - public void init(String meerkatDB) throws CommunicationException; + public void init() throws CommunicationException; /** * Post a message to bulletin board.