diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java index be75dfe..e732059 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java @@ -45,9 +45,9 @@ public class SingleServerBulletinBoardClient implements SubscriptionBulletinBoar private long lastServerErrorTime; - private final long failDelayInMilliseconds; + private final long FAIL_DELAY_IN_MILLISECONDS; - private final long subscriptionIntervalInMilliseconds; + private final long SUBSCRIPTION_INTERVAL_IN_MILLISECONDS; /** * Notify the client that a job has failed @@ -82,7 +82,7 @@ public class SingleServerBulletinBoardClient implements SubscriptionBulletinBoar } try { - Thread.sleep(failDelayInMilliseconds); + Thread.sleep(FAIL_DELAY_IN_MILLISECONDS); } catch (InterruptedException e) { //TODO: log } @@ -108,7 +108,7 @@ public class SingleServerBulletinBoardClient implements SubscriptionBulletinBoar long timeSinceLastServerError = System.currentTimeMillis() - lastServerErrorTime; - if (timeSinceLastServerError >= failDelayInMilliseconds) { + if (timeSinceLastServerError >= FAIL_DELAY_IN_MILLISECONDS) { // Schedule for immediate processing Futures.addCallback(executorService.submit(worker), callback); @@ -118,7 +118,7 @@ public class SingleServerBulletinBoardClient implements SubscriptionBulletinBoar // Schedule for processing immediately following delay expiry Futures.addCallback(executorService.schedule( worker, - failDelayInMilliseconds - timeSinceLastServerError, + FAIL_DELAY_IN_MILLISECONDS - timeSinceLastServerError, TimeUnit.MILLISECONDS), callback); @@ -330,7 +330,7 @@ public class SingleServerBulletinBoardClient implements SubscriptionBulletinBoar RetryCallback> retryCallback = new RetryCallback<>(worker, this); // Schedule the worker to run after the given interval has elapsed - Futures.addCallback(executorService.schedule(worker, subscriptionIntervalInMilliseconds, TimeUnit.MILLISECONDS), retryCallback); + Futures.addCallback(executorService.schedule(worker, SUBSCRIPTION_INTERVAL_IN_MILLISECONDS, TimeUnit.MILLISECONDS), retryCallback); } @@ -352,8 +352,8 @@ public class SingleServerBulletinBoardClient implements SubscriptionBulletinBoar this.executorService = executorService; - this.failDelayInMilliseconds = failDelayInMilliseconds; - this.subscriptionIntervalInMilliseconds = subscriptionIntervalInMilliseconds; + this.FAIL_DELAY_IN_MILLISECONDS = failDelayInMilliseconds; + this.SUBSCRIPTION_INTERVAL_IN_MILLISECONDS = subscriptionIntervalInMilliseconds; // Set server error time to a time sufficiently in the past to make new jobs go through lastServerErrorTime = System.currentTimeMillis() - failDelayInMilliseconds; diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java index b549a2e..9dd85b9 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java @@ -36,13 +36,27 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple private final static int READ_MESSAGES_RETRY_NUM = 1; private final static int GET_REDUNDANCY_RETRY_NUM = 1; - private static final int SERVER_THREADPOOL_SIZE = 5; - private static final long FAIL_DELAY = 5000; - private static final long SUBSCRIPTION_INTERVAL = 10000; + private final int SERVER_THREADPOOL_SIZE; + private final long FAIL_DELAY; + private final long SUBSCRIPTION_INTERVAL; + + private static final int DEFAULT_SERVER_THREADPOOL_SIZE = 5; + private static final long DEFAULT_FAIL_DELAY = 5000; + private static final long DEFAULT_SUBSCRIPTION_INTERVAL = 10000; private int minAbsoluteRedundancy; + public ThreadedBulletinBoardClient(int serverThreadpoolSize, long failDelay, long subscriptionInterval) { + SERVER_THREADPOOL_SIZE = serverThreadpoolSize; + FAIL_DELAY = failDelay; + SUBSCRIPTION_INTERVAL = subscriptionInterval; + } + + public ThreadedBulletinBoardClient() { + this(DEFAULT_SERVER_THREADPOOL_SIZE, DEFAULT_FAIL_DELAY, DEFAULT_SUBSCRIPTION_INTERVAL); + } + /** * Stores database locations and initializes the web Client * Stores the required minimum redundancy. @@ -232,11 +246,9 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple throw new IllegalArgumentException("Message is not a stub and does not contain the required message ID"); } - MessageID msgID = MessageID.newBuilder().setID(stub.getMsg().getMsgId()).build(); - // Create job MultiServerReadBatchDataWorker worker = - new MultiServerReadBatchDataWorker(clients, minAbsoluteRedundancy, msgID, READ_MESSAGES_RETRY_NUM, callback); + new MultiServerReadBatchDataWorker(clients, minAbsoluteRedundancy, stub, READ_MESSAGES_RETRY_NUM, callback); // Submit job executorService.submit(worker); diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadBatchDataWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadBatchDataWorker.java index ab5fd40..769702a 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadBatchDataWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/multiserver/MultiServerReadBatchDataWorker.java @@ -10,10 +10,10 @@ import java.util.List; /** * Created by Arbel Deutsch Peled on 27-Dec-15. */ -public class MultiServerReadBatchDataWorker extends MultiServerGenericReadWorker { +public class MultiServerReadBatchDataWorker extends MultiServerGenericReadWorker { public MultiServerReadBatchDataWorker(List clients, - int minServers, MessageID payload, int maxRetry, + int minServers, BulletinBoardMessage payload, int maxRetry, FutureCallback futureCallback) { super(clients, minServers, payload, maxRetry, futureCallback); @@ -21,8 +21,8 @@ public class MultiServerReadBatchDataWorker extends MultiServerGenericReadWorker } @Override - protected void doRead(MessageID payload, SingleServerBulletinBoardClient client) { - client.readMessage(payload, this); + protected void doRead(BulletinBoardMessage payload, SingleServerBulletinBoardClient client) { + client.readBatchData(payload, this); } diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/CachedBulletinBoardClientTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/CachedBulletinBoardClientTest.java index 83e62ff..8bae8c2 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/CachedBulletinBoardClientTest.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/CachedBulletinBoardClientTest.java @@ -51,7 +51,7 @@ public class CachedBulletinBoardClientTest { cachedClient = new CachedBulletinBoardClient(localClient, remoteClient, subscriber, queueClient, SYNC_DELAY, SYNC_DELAY); subscriptionTester = new GenericSubscriptionClientTester(cachedClient); - clientTest = new GenericBulletinBoardClientTester(cachedClient); + clientTest = new GenericBulletinBoardClientTester(cachedClient, 87351); } diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericBulletinBoardClientTester.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericBulletinBoardClientTester.java index f7a4653..7ded2e4 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericBulletinBoardClientTester.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericBulletinBoardClientTester.java @@ -48,7 +48,6 @@ public class GenericBulletinBoardClientTester { private AsyncBulletinBoardClient bulletinBoardClient; private PostCallback postCallback; - private PostCallback failPostCallback = new PostCallback(true,false); private RedundancyCallback redundancyCallback; private ReadCallback readCallback; @@ -64,7 +63,7 @@ public class GenericBulletinBoardClientTester { // Constructor - public GenericBulletinBoardClientTester(AsyncBulletinBoardClient bulletinBoardClient){ + public GenericBulletinBoardClientTester(AsyncBulletinBoardClient bulletinBoardClient, int seed){ this.bulletinBoardClient = bulletinBoardClient; @@ -113,7 +112,7 @@ public class GenericBulletinBoardClientTester { fail("Couldn't find signing key " + e.getMessage()); } - this.random = new Random(0); + this.random = new Random(seed); this.generator = new BulletinBoardMessageGenerator(random); this.digest = new GenericBulletinBoardDigest(new SHA256Digest()); diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java index 9ee1d60..d33d5ba 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java @@ -31,7 +31,7 @@ public class LocalBulletinBoardClientTest { LocalBulletinBoardClient client = new LocalBulletinBoardClient(server, THREAD_NUM, SUBSRCIPTION_DELAY); subscriptionTester = new GenericSubscriptionClientTester(client); - clientTest = new GenericBulletinBoardClientTester(client); + clientTest = new GenericBulletinBoardClientTester(client, 98354); } diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/SingleServerBulletinBoardClientIntegrationTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/SingleServerBulletinBoardClientIntegrationTest.java index 424c4c5..7c8d58b 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/SingleServerBulletinBoardClientIntegrationTest.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/SingleServerBulletinBoardClientIntegrationTest.java @@ -44,7 +44,7 @@ public class SingleServerBulletinBoardClientIntegrationTest { .setMinRedundancy((float) 1.0) .build()); - clientTest = new GenericBulletinBoardClientTester(client); + clientTest = new GenericBulletinBoardClientTester(client, 981541); subscriptionTester = new GenericSubscriptionClientTester(client); } diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/ThreadedBulletinBoardClientIntegrationTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/ThreadedBulletinBoardClientIntegrationTest.java index dd47354..1187245 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/ThreadedBulletinBoardClientIntegrationTest.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/ThreadedBulletinBoardClientIntegrationTest.java @@ -28,7 +28,7 @@ public class ThreadedBulletinBoardClientIntegrationTest { public ThreadedBulletinBoardClientIntegrationTest(){ - ThreadedBulletinBoardClient client = new ThreadedBulletinBoardClient(); + ThreadedBulletinBoardClient client = new ThreadedBulletinBoardClient(3,0,500); List testDB = new LinkedList<>(); testDB.add(BASE_URL); @@ -38,7 +38,7 @@ public class ThreadedBulletinBoardClientIntegrationTest { .setMinRedundancy((float) 1.0) .build()); - clientTest = new GenericBulletinBoardClientTester(client); + clientTest = new GenericBulletinBoardClientTester(client, 52351); } @@ -76,6 +76,7 @@ public class ThreadedBulletinBoardClientIntegrationTest { public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException { clientTest.testBatchPost(); + } @Test