diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java index 6594b0c..cdb86cb 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java @@ -23,12 +23,16 @@ import java.util.List; public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClient { private final AsyncBulletinBoardClient localClient; - private AsyncBulletinBoardClient remoteClient; - private BulletinBoardSubscriber subscriber; - private BulletinBoardSynchronizer synchronizer; + private final AsyncBulletinBoardClient remoteClient; + private final AsyncBulletinBoardClient queueClient; + private final BulletinBoardSubscriber subscriber; + private final BulletinBoardSynchronizer synchronizer; private Thread syncThread; + private final static int DEFAULT_WAIT_CAP = 3000; + private final static int DEFAULT_SLEEP_INTERVAL = 3000; + private class SubscriptionStoreCallback implements FutureCallback> { private final FutureCallback> callback; @@ -81,21 +85,36 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien * @param localClient is a Client for the local instance * @param remoteClient is a Client for the remote instance(s); Should have endless retries for post operations * @param subscriber is a subscription service to the remote instance(s) - * @param queue is a client for a local deletable server to be used as a queue for not-yet-uploaded messages + * @param queueClient is a client for a local deletable server to be used as a queue for not-yet-uploaded messages */ + public CachedBulletinBoardClient(AsyncBulletinBoardClient localClient, + AsyncBulletinBoardClient remoteClient, + BulletinBoardSubscriber subscriber, + DeletableSubscriptionBulletinBoardClient queueClient, + int sleepInterval, + int waitCap) { + + this.localClient = localClient; + this.remoteClient = remoteClient; + this.subscriber = subscriber; + this.queueClient = queueClient; + + this.synchronizer = new SimpleBulletinBoardSynchronizer(sleepInterval,waitCap); + synchronizer.init(queueClient, remoteClient); + syncThread = new Thread(synchronizer); + syncThread.start(); + } + + /** + * Creates a Cached Client + * Used default values foe the time caps + * */ public CachedBulletinBoardClient(AsyncBulletinBoardClient localClient, AsyncBulletinBoardClient remoteClient, BulletinBoardSubscriber subscriber, DeletableSubscriptionBulletinBoardClient queue) { - this.localClient = localClient; - this.remoteClient = remoteClient; - this.subscriber = subscriber; - - this.synchronizer = new SimpleBulletinBoardSynchronizer(); - synchronizer.init(queue, remoteClient); - syncThread = new Thread(synchronizer); - syncThread.start(); + this(localClient, remoteClient, subscriber, queue, DEFAULT_SLEEP_INTERVAL, DEFAULT_WAIT_CAP); } @Override @@ -385,7 +404,7 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien } @Override - public float getRedundancy(MessageID id) { + public float getRedundancy(MessageID id) throws CommunicationException { return remoteClient.getRedundancy(id); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java index 521815d..a274f53 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java @@ -68,16 +68,11 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{ // Post message to all databases try { for (String db : meerkatDBs) { - webTarget = client.target(db).path(BULLETIN_BOARD_SERVER_PATH).path(POST_MESSAGE_PATH); - response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(msg, Constants.MEDIATYPE_PROTOBUF)); - // Only consider valid responses - if (response.getStatusInfo() == Response.Status.OK - || response.getStatusInfo() == Response.Status.CREATED) { - response.readEntity(BoolValue.class).getValue(); - } else { - throw new CommunicationException("Server returned error. Status was: " + response.getStatus()); - } + SingleServerPostMessageWorker worker = new SingleServerPostMessageWorker(db, msg, 0); + + worker.call(); + } } catch (Exception e) { // Occurs only when server replies with valid status but invalid data throw new CommunicationException("Error accessing database: " + e.getMessage()); diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java index 86ede70..be75dfe 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java @@ -8,15 +8,15 @@ import com.google.protobuf.Int64Value; import com.google.protobuf.Timestamp; import meerkat.bulletinboard.workers.singleserver.*; import meerkat.comm.CommunicationException; -import meerkat.protobuf.BulletinBoardAPI; +import meerkat.crypto.concrete.SHA256Digest; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Crypto; import meerkat.protobuf.Voting.BulletinBoardClientParams; import meerkat.util.BulletinBoardUtils; +import javax.ws.rs.client.Client; import java.lang.Iterable; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -31,11 +31,17 @@ import java.util.concurrent.atomic.AtomicInteger; * If the list of servers contains more than one server: the server actually used is the first one * The class further implements a delayed access to the server after a communication error occurs */ -public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient implements SubscriptionBulletinBoardClient { +public class SingleServerBulletinBoardClient implements SubscriptionBulletinBoardClient { + + protected Client client; + + protected BulletinBoardDigest digest; + + private String dbAddress; private final int MAX_RETRIES = 11; - private ListeningScheduledExecutorService executorService; + private final ListeningScheduledExecutorService executorService; private long lastServerErrorTime; @@ -54,6 +60,43 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i } + private class SynchronousRetry { + + private final SingleServerWorker worker; + + private String thrown; + + public SynchronousRetry(SingleServerWorker worker) { + this.worker = worker; + this.thrown = "Could not contact server. Errors follow:\n"; + } + + OUT run() throws CommunicationException { + + do { + + try { + return worker.call(); + } catch (Exception e) { + thrown += e.getCause() + " " + e.getMessage() + "\n"; + } + + try { + Thread.sleep(failDelayInMilliseconds); + } catch (InterruptedException e) { + //TODO: log + } + + worker.decMaxRetry(); + + } while (worker.isRetry()); + + throw new CommunicationException(thrown); + + } + + } + /** * This method adds a worker to the scheduled queue of the threadpool * If the server is in an accessible state: the job is submitted for immediate handling @@ -225,7 +268,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i .build()) .build(); - SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(meerkatDBs.get(0), batchQuery, MAX_RETRIES); + SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(dbAddress, batchQuery, MAX_RETRIES); scheduleWorker(batchWorker, new ReadBatchCallback(msg, callback)); @@ -266,20 +309,28 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i if (callback != null) callback.onSuccess(result); - // Remove last filter from list (MIN_ENTRY one) - filterBuilder.removeFilter(filterBuilder.getFilterCount() - 1); + // Update filter if needed - // Add updated MIN_ENTRY filter (entry number is successor of last received entry's number) - filterBuilder.addFilter(MessageFilter.newBuilder() - .setType(FilterType.MIN_ENTRY) - .setEntry(result.get(result.size() - 1).getEntryNum() + 1) - .build()); + if (result.size() > 0) { + + // Remove last filter from list (MIN_ENTRY one) + filterBuilder.removeFilter(filterBuilder.getFilterCount() - 1); + + // Add updated MIN_ENTRY filter (entry number is successor of last received entry's number) + filterBuilder.addFilter(MessageFilter.newBuilder() + .setType(FilterType.MIN_ENTRY) + .setEntry(result.get(result.size() - 1).getEntryNum() + 1) + .build()); + + } // Create new worker with updated task - worker = new SingleServerReadMessagesWorker(worker.serverAddress, filterBuilder.build(), 1); + worker = new SingleServerReadMessagesWorker(worker.serverAddress, filterBuilder.build(), MAX_RETRIES); - // Schedule the worker - scheduleWorker(worker, this); + RetryCallback> retryCallback = new RetryCallback<>(worker, this); + + // Schedule the worker to run after the given interval has elapsed + Futures.addCallback(executorService.schedule(worker, subscriptionIntervalInMilliseconds, TimeUnit.MILLISECONDS), retryCallback); } @@ -324,21 +375,192 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i @Override public void init(BulletinBoardClientParams clientParams) { - // Perform usual setup - super.init(clientParams); + this.digest = new GenericBulletinBoardDigest(new SHA256Digest()); // Remove all but first DB address - String dbAddress = meerkatDBs.get(0); - meerkatDBs = new LinkedList<>(); - meerkatDBs.add(dbAddress); + this.dbAddress = clientParams.getBulletinBoardAddress(0); } + // Synchronous methods + + @Override + public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException { + + SingleServerPostMessageWorker worker = new SingleServerPostMessageWorker(dbAddress, msg, MAX_RETRIES); + + SynchronousRetry retry = new SynchronousRetry<>(worker); + + retry.run(); + + digest.reset(); + digest.update(msg); + + return digest.digestAsMessageID(); + + } + + @Override + public float getRedundancy(MessageID id) throws CommunicationException { + + SingleServerGetRedundancyWorker worker = new SingleServerGetRedundancyWorker(dbAddress, id, MAX_RETRIES); + + SynchronousRetry retry = new SynchronousRetry<>(worker); + + return retry.run(); + + } + + @Override + public List readMessages(MessageFilterList filterList) throws CommunicationException { + + SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(dbAddress, filterList, MAX_RETRIES); + + SynchronousRetry> retry = new SynchronousRetry<>(worker); + + return retry.run(); + + } + + @Override + public MessageID postAsBatch(BulletinBoardMessage msg, int chunkSize) throws CommunicationException { + + // Begin the batch and obtain identifier + + BeginBatchMessage beginBatchMessage = BeginBatchMessage.newBuilder() + .addAllTag(msg.getMsg().getTagList()) + .build(); + + SingleServerBeginBatchWorker beginBatchWorker = new SingleServerBeginBatchWorker(dbAddress, beginBatchMessage, MAX_RETRIES); + + SynchronousRetry beginRetry = new SynchronousRetry<>(beginBatchWorker); + + Int64Value identifier = beginRetry.run(); + + // Post data chunks + + List batchChunkList = BulletinBoardUtils.breakToBatch(msg, chunkSize); + + BatchMessage.Builder builder = BatchMessage.newBuilder().setBatchId(identifier.getValue()); + + int position = 0; + + for (BatchChunk data : batchChunkList) { + + builder.setSerialNum(position).setData(data); + + SingleServerPostBatchWorker dataWorker = new SingleServerPostBatchWorker(dbAddress, builder.build(), MAX_RETRIES); + + SynchronousRetry dataRetry = new SynchronousRetry<>(dataWorker); + + dataRetry.run(); + + // Increment position in batch + position++; + + } + + // Close batch + + CloseBatchMessage closeBatchMessage = CloseBatchMessage.newBuilder() + .setBatchId(identifier.getValue()) + .addAllSig(msg.getSigList()) + .setTimestamp(msg.getMsg().getTimestamp()) + .setBatchLength(position) + .build(); + + SingleServerCloseBatchWorker closeBatchWorker = new SingleServerCloseBatchWorker(dbAddress, closeBatchMessage, MAX_RETRIES); + + SynchronousRetry retry = new SynchronousRetry<>(closeBatchWorker); + + retry.run(); + + // Calculate ID and return + + digest.reset(); + digest.update(msg); + + return digest.digestAsMessageID(); + + } + + @Override + public BulletinBoardMessage readMessage(MessageID msgID) throws CommunicationException { + + // Retrieve message (which may be a stub) + + MessageFilterList filterList = MessageFilterList.newBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.MSG_ID) + .setId(msgID.getID()) + .build()) + .build(); + + SingleServerReadMessagesWorker stubWorker = new SingleServerReadMessagesWorker(dbAddress, filterList, MAX_RETRIES); + + SynchronousRetry> retry = new SynchronousRetry<>(stubWorker); + + List messages = retry.run(); + + if (messages.size() <= 0) { + throw new CommunicationException("Could not find message in database."); + } + + BulletinBoardMessage msg = messages.get(0); + + if (msg.getMsg().getDataTypeCase() != UnsignedBulletinBoardMessage.DataTypeCase.MSGID) { + + // We retrieved a complete message. Return it. + return msg; + + } else { + + // We retrieved a stub. Retrieve data. + return readBatchData(msg); + + } + + } + + @Override + public BulletinBoardMessage readBatchData(BulletinBoardMessage stub) throws CommunicationException, IllegalArgumentException { + + BatchQuery batchQuery = BatchQuery.newBuilder() + .setMsgID(MessageID.newBuilder() + .setID(stub.getMsg().getMsgId()) + .build()) + .setStartPosition(0) + .build(); + + SingleServerReadBatchWorker readBatchWorker = new SingleServerReadBatchWorker(dbAddress, batchQuery, MAX_RETRIES); + + SynchronousRetry> batchRetry = new SynchronousRetry<>(readBatchWorker); + + List batchChunkList = batchRetry.run(); + + return BulletinBoardUtils.gatherBatch(stub, batchChunkList); + + } + + @Override + public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException { + + SingleServerGenerateSyncQueryWorker worker = + new SingleServerGenerateSyncQueryWorker(dbAddress, generateSyncQueryParams, MAX_RETRIES); + + SynchronousRetry retry = new SynchronousRetry<>(worker); + + return retry.run(); + + } + + // Asynchronous methods + @Override public MessageID postMessage(BulletinBoardMessage msg, FutureCallback callback) { // Create worker with redundancy 1 and MAX_RETRIES retries - SingleServerPostMessageWorker worker = new SingleServerPostMessageWorker(meerkatDBs.get(0), msg, MAX_RETRIES); + SingleServerPostMessageWorker worker = new SingleServerPostMessageWorker(dbAddress, msg, MAX_RETRIES); // Submit worker and create callback scheduleWorker(worker, new RetryCallback<>(worker, callback)); @@ -453,7 +675,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i // Create worker with redundancy 1 and MAX_RETRIES retries SingleServerBeginBatchWorker worker = - new SingleServerBeginBatchWorker(meerkatDBs.get(0), beginBatchMessage, MAX_RETRIES); + new SingleServerBeginBatchWorker(dbAddress, beginBatchMessage, MAX_RETRIES); // Submit worker and create callback scheduleWorker(worker, new RetryCallback<>(worker, new BeginBatchCallback(callback))); @@ -491,7 +713,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i // Create worker with redundancy 1 and MAX_RETRIES retries SingleServerPostBatchWorker worker = - new SingleServerPostBatchWorker(meerkatDBs.get(0), builder.build(), MAX_RETRIES); + new SingleServerPostBatchWorker(dbAddress, builder.build(), MAX_RETRIES); // Create worker with redundancy 1 and MAX_RETRIES retries scheduleWorker(worker, new RetryCallback<>(worker, listCallback)); @@ -529,7 +751,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i // Create worker with redundancy 1 and MAX_RETRIES retries SingleServerCloseBatchWorker worker = - new SingleServerCloseBatchWorker(meerkatDBs.get(0), closeBatchMessage, MAX_RETRIES); + new SingleServerCloseBatchWorker(dbAddress, closeBatchMessage, MAX_RETRIES); // Submit worker and create callback scheduleWorker(worker, new RetryCallback<>(worker, callback)); @@ -540,7 +762,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i public void getRedundancy(MessageID id, FutureCallback callback) { // Create worker with no retries - SingleServerGetRedundancyWorker worker = new SingleServerGetRedundancyWorker(meerkatDBs.get(0), id, 1); + SingleServerGetRedundancyWorker worker = new SingleServerGetRedundancyWorker(dbAddress, id, 1); // Submit job and create callback scheduleWorker(worker, new RetryCallback<>(worker, callback)); @@ -551,7 +773,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i public void readMessages(MessageFilterList filterList, FutureCallback> callback) { // Create job with no retries - SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterList, 1); + SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(dbAddress, filterList, 1); // Submit job and create callback scheduleWorker(worker, new RetryCallback<>(worker, callback)); @@ -575,7 +797,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i .setStartPosition(0) .build(); - SingleServerReadMessagesWorker messageWorker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterList, MAX_RETRIES); + SingleServerReadMessagesWorker messageWorker = new SingleServerReadMessagesWorker(dbAddress, filterList, MAX_RETRIES); // Submit jobs with wrapped callbacks scheduleWorker(messageWorker, new RetryCallback<>(messageWorker, new CompleteMessageReadCallback(callback))); @@ -598,7 +820,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i .setStartPosition(0) .build(); - SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(meerkatDBs.get(0), batchQuery, MAX_RETRIES); + SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(dbAddress, batchQuery, MAX_RETRIES); scheduleWorker(batchWorker, new RetryCallback<>(batchWorker, new ReadBatchCallback(stub, callback))); @@ -607,7 +829,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i @Override public void querySync(SyncQuery syncQuery, FutureCallback callback) { - SingleServerQuerySyncWorker worker = new SingleServerQuerySyncWorker(meerkatDBs.get(0), syncQuery, MAX_RETRIES); + SingleServerQuerySyncWorker worker = new SingleServerQuerySyncWorker(dbAddress, syncQuery, MAX_RETRIES); scheduleWorker(worker, new RetryCallback<>(worker, callback)); @@ -633,7 +855,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i .build()); // Create job with no retries - SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterListBuilder.build(), MAX_RETRIES); + SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(dbAddress, filterListBuilder.build(), MAX_RETRIES); // Submit job and create callback that retries on failure and handles repeated subscription scheduleWorker(worker, new RetryCallback<>(worker, new SubscriptionCallback(worker, callback))); @@ -647,8 +869,6 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i @Override public void close() { - super.close(); - executorService.shutdown(); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java index 9568255..252d6e3 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java @@ -250,7 +250,6 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber super(filterList, callback); } - @Override public void onSuccess(List result) { @@ -274,5 +273,4 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber subscribe(filterList, 0, callback); } - } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGenerateSyncQueryWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGenerateSyncQueryWorker.java new file mode 100644 index 0000000..71d90e0 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGenerateSyncQueryWorker.java @@ -0,0 +1,55 @@ +package meerkat.bulletinboard.workers.singleserver; + +import com.google.protobuf.Int64Value; +import meerkat.bulletinboard.SingleServerWorker; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI.SyncQuery; +import meerkat.protobuf.BulletinBoardAPI.GenerateSyncQueryParams; +import meerkat.rest.Constants; + +import javax.ws.rs.ProcessingException; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; + +import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH; +import static meerkat.bulletinboard.BulletinBoardConstants.GENERATE_SYNC_QUERY_PATH; + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + * Tries to contact server once and perform a Sync Query Generation operation + */ +public class SingleServerGenerateSyncQueryWorker extends SingleServerWorker { + + public SingleServerGenerateSyncQueryWorker(String serverAddress, GenerateSyncQueryParams payload, int maxRetry) { + super(serverAddress, payload, maxRetry); + } + + @Override + public SyncQuery call() throws Exception { + + Client client = clientLocal.get(); + + WebTarget webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(GENERATE_SYNC_QUERY_PATH); + + Response response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(payload, Constants.MEDIATYPE_PROTOBUF)); + + try { + + SyncQuery result = response.readEntity(SyncQuery.class); + return result; + + } catch (ProcessingException | IllegalStateException e) { + + // Post to this server failed + throw new CommunicationException("Could not contact the server. Original error: " + e.getMessage()); + + } catch (Exception e) { + throw new CommunicationException("Could not contact the server. Original error: " + e.getMessage()); + } + finally { + response.close(); + } + } +} diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java index 588f529..19e7ae5 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java @@ -114,7 +114,7 @@ public class BulletinBoardSynchronizerTest { public void init() throws CommunicationException { DeletableBulletinBoardServer remoteServer = new BulletinBoardSQLServer(new H2QueryProvider(REMOTE_SERVER_ADDRESS + testCount)); - remoteServer.init(REMOTE_SERVER_ADDRESS); + remoteServer.init(); remoteClient = new LocalBulletinBoardClient( remoteServer, @@ -122,7 +122,7 @@ public class BulletinBoardSynchronizerTest { SUBSCRIPTION_INTERVAL); DeletableBulletinBoardServer localServer = new BulletinBoardSQLServer(new H2QueryProvider(LOCAL_SERVER_ADDRESS + testCount)); - localServer.init(LOCAL_SERVER_ADDRESS); + localServer.init(); localClient = new LocalBulletinBoardClient( localServer, diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/CachedBulletinBoardClientTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/CachedBulletinBoardClientTest.java new file mode 100644 index 0000000..83e62ff --- /dev/null +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/CachedBulletinBoardClientTest.java @@ -0,0 +1,111 @@ +package meerkat.bulletinboard; + +import meerkat.bulletinboard.sqlserver.BulletinBoardSQLServer; +import meerkat.bulletinboard.sqlserver.H2QueryProvider; +import meerkat.comm.CommunicationException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.security.SignatureException; +import java.util.LinkedList; +import java.util.List; + +/** + * Created by Arbel on 6/27/2016. + */ +public class CachedBulletinBoardClientTest { + + private static final int THREAD_NUM = 3; + + private static final String LOCAL_DB_NAME = "localDB"; + private static final String REMOTE_DB_NAME = "remoteDB"; + private static final String QUEUE_DB_NAME = "queueDB"; + + private static final int SUBSRCIPTION_DELAY = 500; + private static final int SYNC_DELAY = 500; + + // Testers + private CachedBulletinBoardClient cachedClient; + private GenericBulletinBoardClientTester clientTest; + private GenericSubscriptionClientTester subscriptionTester; + + public CachedBulletinBoardClientTest() throws CommunicationException { + + DeletableBulletinBoardServer localServer = new BulletinBoardSQLServer(new H2QueryProvider(LOCAL_DB_NAME)); + localServer.init(); + LocalBulletinBoardClient localClient = new LocalBulletinBoardClient(localServer, THREAD_NUM, SUBSRCIPTION_DELAY); + + DeletableBulletinBoardServer remoteServer = new BulletinBoardSQLServer(new H2QueryProvider(REMOTE_DB_NAME)); + remoteServer.init(); + LocalBulletinBoardClient remoteClient = new LocalBulletinBoardClient(remoteServer, THREAD_NUM, SUBSRCIPTION_DELAY); + + DeletableBulletinBoardServer queueServer = new BulletinBoardSQLServer(new H2QueryProvider(QUEUE_DB_NAME)); + queueServer.init(); + LocalBulletinBoardClient queueClient = new LocalBulletinBoardClient(queueServer, THREAD_NUM, SUBSRCIPTION_DELAY); + + List clientList = new LinkedList<>(); + clientList.add(remoteClient); + + BulletinBoardSubscriber subscriber = new ThreadedBulletinBoardSubscriber(clientList, localClient); + + cachedClient = new CachedBulletinBoardClient(localClient, remoteClient, subscriber, queueClient, SYNC_DELAY, SYNC_DELAY); + subscriptionTester = new GenericSubscriptionClientTester(cachedClient); + clientTest = new GenericBulletinBoardClientTester(cachedClient); + + } + + // Test methods + + /** + * Takes care of initializing the client and the test resources + */ + @Before + public void init(){ + + clientTest.init(); + + } + + /** + * Closes the client and makes sure the test fails when an exception occurred in a separate thread + */ + + @After + public void close() { + + cachedClient.close(); + clientTest.close(); + + } + + @Test + public void testPost() { + + clientTest.testPost(); + + } + + @Test + public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + clientTest.testBatchPost(); + } + + @Test + public void testCompleteBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + clientTest.testCompleteBatchPost(); + + } + + @Test + public void testSubscription() throws SignatureException, CommunicationException { + +// subscriptionTester.init(); +// subscriptionTester.subscriptionTest(); +// subscriptionTester.close(); + + } + +} diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java index ea74885..6d02914 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java @@ -178,7 +178,7 @@ public class GenericSubscriptionClientTester { public void onFailure(Throwable t) { System.err.println(t.getCause() + " " + t.getMessage()); thrown.add(t); - jobSemaphore.release(expectedMessages.size()); + jobSemaphore.release(); stage = expectedMessages.size(); } } @@ -202,9 +202,9 @@ public class GenericSubscriptionClientTester { .build(); List> expectedMessages = new ArrayList<>(3); - expectedMessages.add(new LinkedList()); - expectedMessages.add(new LinkedList()); - expectedMessages.add(new LinkedList()); + expectedMessages.add(new LinkedList<>()); + expectedMessages.add(new LinkedList<>()); + expectedMessages.add(new LinkedList<>()); expectedMessages.get(0).add(msg1); expectedMessages.get(2).add(msg3); diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java index 56392d7..9ee1d60 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java @@ -7,12 +7,6 @@ import org.junit.Before; import org.junit.Test; import java.security.SignatureException; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.List; - -import static org.junit.Assert.fail; /** * Created by Arbel Deutsch Peled on 05-Dec-15. @@ -30,26 +24,10 @@ public class LocalBulletinBoardClientTest { public LocalBulletinBoardClientTest() throws CommunicationException { - H2QueryProvider queryProvider = new H2QueryProvider(DB_NAME) ; - - try { - - Connection conn = queryProvider.getDataSource().getConnection(); - Statement stmt = conn.createStatement(); - - List deletionQueries = queryProvider.getSchemaDeletionCommands(); - - for (String deletionQuery : deletionQueries) { - stmt.execute(deletionQuery); - } - - } catch (SQLException e) { - System.err.println(e.getMessage()); - throw new CommunicationException(e.getCause() + " " + e.getMessage()); - } + H2QueryProvider queryProvider = new H2QueryProvider(DB_NAME); DeletableBulletinBoardServer server = new BulletinBoardSQLServer(queryProvider); - server.init(DB_NAME); + server.init(); LocalBulletinBoardClient client = new LocalBulletinBoardClient(server, THREAD_NUM, SUBSRCIPTION_DELAY); subscriptionTester = new GenericSubscriptionClientTester(client); @@ -102,6 +80,7 @@ public class LocalBulletinBoardClientTest { @Test public void testSubscription() throws SignatureException, CommunicationException { + subscriptionTester.init(); subscriptionTester.subscriptionTest(); subscriptionTester.close(); diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/SingleServerBulletinBoardClientIntegrationTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/SingleServerBulletinBoardClientIntegrationTest.java new file mode 100644 index 0000000..424c4c5 --- /dev/null +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/SingleServerBulletinBoardClientIntegrationTest.java @@ -0,0 +1,104 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.Voting.BulletinBoardClientParams; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.security.SignatureException; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Executors; + +/** + * Created by Arbel Deutsch Peled on 05-Dec-15. + */ +public class SingleServerBulletinBoardClientIntegrationTest { + + // Server data + + private static final String PROP_GETTY_URL = "gretty.httpBaseURI"; + private static final String DEFAULT_BASE_URL = "http://localhost:8081"; + private static final String BASE_URL = System.getProperty(PROP_GETTY_URL, DEFAULT_BASE_URL); + + private static final int THREAD_NUM = 3; + private static final long FAIL_DELAY = 3000; + private static final long SUBSCRIPTION_INTERVAL = 500; + + // Testers + private GenericBulletinBoardClientTester clientTest; + private GenericSubscriptionClientTester subscriptionTester; + + public SingleServerBulletinBoardClientIntegrationTest(){ + + SingleServerBulletinBoardClient client = new SingleServerBulletinBoardClient(THREAD_NUM, FAIL_DELAY, SUBSCRIPTION_INTERVAL); + + List testDB = new LinkedList<>(); + testDB.add(BASE_URL); + + client.init(BulletinBoardClientParams.newBuilder() + .addAllBulletinBoardAddress(testDB) + .setMinRedundancy((float) 1.0) + .build()); + + clientTest = new GenericBulletinBoardClientTester(client); + subscriptionTester = new GenericSubscriptionClientTester(client); + + } + + // Test methods + + /** + * Takes care of initializing the client and the test resources + */ + @Before + public void init(){ + + clientTest.init(); + + } + + /** + * Closes the client and makes sure the test fails when an exception occurred in a separate thread + */ + + @After + public void close() { + + clientTest.close(); + + } + + @Test + public void testPost() { + + clientTest.testPost(); + + } + + @Test + public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + clientTest.testBatchPost(); + } + + @Test + public void testCompleteBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + clientTest.testCompleteBatchPost(); + + } + + @Test + public void testSubscription() throws SignatureException, CommunicationException { + + subscriptionTester.init(); + subscriptionTester.subscriptionTest(); + subscriptionTester.close(); + + } + +} diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java index e9c910a..bf99259 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java @@ -368,7 +368,7 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{ * This method initializes the signatures, connects to the DB and creates the schema (if required). */ @Override - public void init(String meerkatDB) throws CommunicationException { + public void init() throws CommunicationException { // TODO write signature reading part. digest = new GenericBulletinBoardDigest(new SHA256Digest()); diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java index f9d3a94..8e9d161 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java @@ -44,14 +44,6 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL bulletinBoard = (BulletinBoardServer) servletContext.getAttribute(BULLETIN_BOARD_ATTRIBUTE_NAME); } - /** - * This is the BulletinBoard init method. - */ - @Override - public void init(String meerkatDB) throws CommunicationException { - bulletinBoard.init(meerkatDB); - } - @Override public void contextInitialized(ServletContextEvent servletContextEvent) { ServletContext servletContext = servletContextEvent.getServletContext(); @@ -77,7 +69,7 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL } try { - init(dbName); + bulletinBoard.init(); servletContext.setAttribute(BULLETIN_BOARD_ATTRIBUTE_NAME, bulletinBoard); } catch (CommunicationException e) { System.err.println(e.getMessage()); diff --git a/bulletin-board-server/src/test/java/meerkat/bulletinboard/H2BulletinBoardServerTest.java b/bulletin-board-server/src/test/java/meerkat/bulletinboard/H2BulletinBoardServerTest.java index fa96635..def0b41 100644 --- a/bulletin-board-server/src/test/java/meerkat/bulletinboard/H2BulletinBoardServerTest.java +++ b/bulletin-board-server/src/test/java/meerkat/bulletinboard/H2BulletinBoardServerTest.java @@ -54,7 +54,7 @@ public class H2BulletinBoardServerTest { BulletinBoardServer bulletinBoardServer = new BulletinBoardSQLServer(queryProvider); try { - bulletinBoardServer.init(""); + bulletinBoardServer.init(); } catch (CommunicationException e) { System.err.println(e.getMessage()); diff --git a/bulletin-board-server/src/test/java/meerkat/bulletinboard/MySQLBulletinBoardServerTest.java b/bulletin-board-server/src/test/java/meerkat/bulletinboard/MySQLBulletinBoardServerTest.java index 334620c..abc0fc6 100644 --- a/bulletin-board-server/src/test/java/meerkat/bulletinboard/MySQLBulletinBoardServerTest.java +++ b/bulletin-board-server/src/test/java/meerkat/bulletinboard/MySQLBulletinBoardServerTest.java @@ -58,7 +58,7 @@ public class MySQLBulletinBoardServerTest { BulletinBoardServer bulletinBoardServer = new BulletinBoardSQLServer(queryProvider); try { - bulletinBoardServer.init(""); + bulletinBoardServer.init(); } catch (CommunicationException e) { System.err.println(e.getMessage()); diff --git a/bulletin-board-server/src/test/java/meerkat/bulletinboard/SQLiteBulletinBoardServerTest.java b/bulletin-board-server/src/test/java/meerkat/bulletinboard/SQLiteBulletinBoardServerTest.java index 4dcd97b..18278b3 100644 --- a/bulletin-board-server/src/test/java/meerkat/bulletinboard/SQLiteBulletinBoardServerTest.java +++ b/bulletin-board-server/src/test/java/meerkat/bulletinboard/SQLiteBulletinBoardServerTest.java @@ -39,7 +39,7 @@ public class SQLiteBulletinBoardServerTest{ BulletinBoardServer bulletinBoardServer = new BulletinBoardSQLServer(new SQLiteQueryProvider(testFilename)); try { - bulletinBoardServer.init(""); + bulletinBoardServer.init(); } catch (CommunicationException e) { System.err.println(e.getMessage()); diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java index 7f29608..c03051c 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java @@ -31,8 +31,9 @@ public interface BulletinBoardClient { * Check how "safe" a given message is in a synchronous manner * @param id is the unique message identifier for retrieval * @return a normalized "redundancy score" from 0 (local only) to 1 (fully published) + * @throws CommunicationException */ - float getRedundancy(MessageID id); + float getRedundancy(MessageID id) throws CommunicationException; /** * Read all messages posted matching the given filter in a synchronous manner diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java index 8121b85..40f8ab3 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardServer.java @@ -22,7 +22,7 @@ public interface BulletinBoardServer{ * It also establishes the connection to the DB * @throws CommunicationException on DB connection error */ - public void init(String meerkatDB) throws CommunicationException; + public void init() throws CommunicationException; /** * Post a message to bulletin board.