Fixed some errors in the tests.

Made Threaded Client parameterized (with respect to waiting times and thread count).
Cached-Client
arbel.peled 2016-06-28 15:08:36 +03:00
parent 53d609bfee
commit 8aada21119
8 changed files with 38 additions and 26 deletions

View File

@ -45,9 +45,9 @@ public class SingleServerBulletinBoardClient implements SubscriptionBulletinBoar
private long lastServerErrorTime; private long lastServerErrorTime;
private final long failDelayInMilliseconds; private final long FAIL_DELAY_IN_MILLISECONDS;
private final long subscriptionIntervalInMilliseconds; private final long SUBSCRIPTION_INTERVAL_IN_MILLISECONDS;
/** /**
* Notify the client that a job has failed * Notify the client that a job has failed
@ -82,7 +82,7 @@ public class SingleServerBulletinBoardClient implements SubscriptionBulletinBoar
} }
try { try {
Thread.sleep(failDelayInMilliseconds); Thread.sleep(FAIL_DELAY_IN_MILLISECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
//TODO: log //TODO: log
} }
@ -108,7 +108,7 @@ public class SingleServerBulletinBoardClient implements SubscriptionBulletinBoar
long timeSinceLastServerError = System.currentTimeMillis() - lastServerErrorTime; long timeSinceLastServerError = System.currentTimeMillis() - lastServerErrorTime;
if (timeSinceLastServerError >= failDelayInMilliseconds) { if (timeSinceLastServerError >= FAIL_DELAY_IN_MILLISECONDS) {
// Schedule for immediate processing // Schedule for immediate processing
Futures.addCallback(executorService.submit(worker), callback); Futures.addCallback(executorService.submit(worker), callback);
@ -118,7 +118,7 @@ public class SingleServerBulletinBoardClient implements SubscriptionBulletinBoar
// Schedule for processing immediately following delay expiry // Schedule for processing immediately following delay expiry
Futures.addCallback(executorService.schedule( Futures.addCallback(executorService.schedule(
worker, worker,
failDelayInMilliseconds - timeSinceLastServerError, FAIL_DELAY_IN_MILLISECONDS - timeSinceLastServerError,
TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS),
callback); callback);
@ -330,7 +330,7 @@ public class SingleServerBulletinBoardClient implements SubscriptionBulletinBoar
RetryCallback<List<BulletinBoardMessage>> retryCallback = new RetryCallback<>(worker, this); RetryCallback<List<BulletinBoardMessage>> retryCallback = new RetryCallback<>(worker, this);
// Schedule the worker to run after the given interval has elapsed // Schedule the worker to run after the given interval has elapsed
Futures.addCallback(executorService.schedule(worker, subscriptionIntervalInMilliseconds, TimeUnit.MILLISECONDS), retryCallback); Futures.addCallback(executorService.schedule(worker, SUBSCRIPTION_INTERVAL_IN_MILLISECONDS, TimeUnit.MILLISECONDS), retryCallback);
} }
@ -352,8 +352,8 @@ public class SingleServerBulletinBoardClient implements SubscriptionBulletinBoar
this.executorService = executorService; this.executorService = executorService;
this.failDelayInMilliseconds = failDelayInMilliseconds; this.FAIL_DELAY_IN_MILLISECONDS = failDelayInMilliseconds;
this.subscriptionIntervalInMilliseconds = subscriptionIntervalInMilliseconds; this.SUBSCRIPTION_INTERVAL_IN_MILLISECONDS = subscriptionIntervalInMilliseconds;
// Set server error time to a time sufficiently in the past to make new jobs go through // Set server error time to a time sufficiently in the past to make new jobs go through
lastServerErrorTime = System.currentTimeMillis() - failDelayInMilliseconds; lastServerErrorTime = System.currentTimeMillis() - failDelayInMilliseconds;

View File

@ -36,13 +36,27 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
private final static int READ_MESSAGES_RETRY_NUM = 1; private final static int READ_MESSAGES_RETRY_NUM = 1;
private final static int GET_REDUNDANCY_RETRY_NUM = 1; private final static int GET_REDUNDANCY_RETRY_NUM = 1;
private static final int SERVER_THREADPOOL_SIZE = 5; private final int SERVER_THREADPOOL_SIZE;
private static final long FAIL_DELAY = 5000; private final long FAIL_DELAY;
private static final long SUBSCRIPTION_INTERVAL = 10000; private final long SUBSCRIPTION_INTERVAL;
private static final int DEFAULT_SERVER_THREADPOOL_SIZE = 5;
private static final long DEFAULT_FAIL_DELAY = 5000;
private static final long DEFAULT_SUBSCRIPTION_INTERVAL = 10000;
private int minAbsoluteRedundancy; private int minAbsoluteRedundancy;
public ThreadedBulletinBoardClient(int serverThreadpoolSize, long failDelay, long subscriptionInterval) {
SERVER_THREADPOOL_SIZE = serverThreadpoolSize;
FAIL_DELAY = failDelay;
SUBSCRIPTION_INTERVAL = subscriptionInterval;
}
public ThreadedBulletinBoardClient() {
this(DEFAULT_SERVER_THREADPOOL_SIZE, DEFAULT_FAIL_DELAY, DEFAULT_SUBSCRIPTION_INTERVAL);
}
/** /**
* Stores database locations and initializes the web Client * Stores database locations and initializes the web Client
* Stores the required minimum redundancy. * Stores the required minimum redundancy.
@ -232,11 +246,9 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
throw new IllegalArgumentException("Message is not a stub and does not contain the required message ID"); throw new IllegalArgumentException("Message is not a stub and does not contain the required message ID");
} }
MessageID msgID = MessageID.newBuilder().setID(stub.getMsg().getMsgId()).build();
// Create job // Create job
MultiServerReadBatchDataWorker worker = MultiServerReadBatchDataWorker worker =
new MultiServerReadBatchDataWorker(clients, minAbsoluteRedundancy, msgID, READ_MESSAGES_RETRY_NUM, callback); new MultiServerReadBatchDataWorker(clients, minAbsoluteRedundancy, stub, READ_MESSAGES_RETRY_NUM, callback);
// Submit job // Submit job
executorService.submit(worker); executorService.submit(worker);

View File

@ -10,10 +10,10 @@ import java.util.List;
/** /**
* Created by Arbel Deutsch Peled on 27-Dec-15. * Created by Arbel Deutsch Peled on 27-Dec-15.
*/ */
public class MultiServerReadBatchDataWorker extends MultiServerGenericReadWorker<MessageID, BulletinBoardMessage> { public class MultiServerReadBatchDataWorker extends MultiServerGenericReadWorker<BulletinBoardMessage, BulletinBoardMessage> {
public MultiServerReadBatchDataWorker(List<SingleServerBulletinBoardClient> clients, public MultiServerReadBatchDataWorker(List<SingleServerBulletinBoardClient> clients,
int minServers, MessageID payload, int maxRetry, int minServers, BulletinBoardMessage payload, int maxRetry,
FutureCallback<BulletinBoardMessage> futureCallback) { FutureCallback<BulletinBoardMessage> futureCallback) {
super(clients, minServers, payload, maxRetry, futureCallback); super(clients, minServers, payload, maxRetry, futureCallback);
@ -21,8 +21,8 @@ public class MultiServerReadBatchDataWorker extends MultiServerGenericReadWorker
} }
@Override @Override
protected void doRead(MessageID payload, SingleServerBulletinBoardClient client) { protected void doRead(BulletinBoardMessage payload, SingleServerBulletinBoardClient client) {
client.readMessage(payload, this); client.readBatchData(payload, this);
} }

View File

@ -51,7 +51,7 @@ public class CachedBulletinBoardClientTest {
cachedClient = new CachedBulletinBoardClient(localClient, remoteClient, subscriber, queueClient, SYNC_DELAY, SYNC_DELAY); cachedClient = new CachedBulletinBoardClient(localClient, remoteClient, subscriber, queueClient, SYNC_DELAY, SYNC_DELAY);
subscriptionTester = new GenericSubscriptionClientTester(cachedClient); subscriptionTester = new GenericSubscriptionClientTester(cachedClient);
clientTest = new GenericBulletinBoardClientTester(cachedClient); clientTest = new GenericBulletinBoardClientTester(cachedClient, 87351);
} }

View File

@ -48,7 +48,6 @@ public class GenericBulletinBoardClientTester {
private AsyncBulletinBoardClient bulletinBoardClient; private AsyncBulletinBoardClient bulletinBoardClient;
private PostCallback postCallback; private PostCallback postCallback;
private PostCallback failPostCallback = new PostCallback(true,false);
private RedundancyCallback redundancyCallback; private RedundancyCallback redundancyCallback;
private ReadCallback readCallback; private ReadCallback readCallback;
@ -64,7 +63,7 @@ public class GenericBulletinBoardClientTester {
// Constructor // Constructor
public GenericBulletinBoardClientTester(AsyncBulletinBoardClient bulletinBoardClient){ public GenericBulletinBoardClientTester(AsyncBulletinBoardClient bulletinBoardClient, int seed){
this.bulletinBoardClient = bulletinBoardClient; this.bulletinBoardClient = bulletinBoardClient;
@ -113,7 +112,7 @@ public class GenericBulletinBoardClientTester {
fail("Couldn't find signing key " + e.getMessage()); fail("Couldn't find signing key " + e.getMessage());
} }
this.random = new Random(0); this.random = new Random(seed);
this.generator = new BulletinBoardMessageGenerator(random); this.generator = new BulletinBoardMessageGenerator(random);
this.digest = new GenericBulletinBoardDigest(new SHA256Digest()); this.digest = new GenericBulletinBoardDigest(new SHA256Digest());

View File

@ -31,7 +31,7 @@ public class LocalBulletinBoardClientTest {
LocalBulletinBoardClient client = new LocalBulletinBoardClient(server, THREAD_NUM, SUBSRCIPTION_DELAY); LocalBulletinBoardClient client = new LocalBulletinBoardClient(server, THREAD_NUM, SUBSRCIPTION_DELAY);
subscriptionTester = new GenericSubscriptionClientTester(client); subscriptionTester = new GenericSubscriptionClientTester(client);
clientTest = new GenericBulletinBoardClientTester(client); clientTest = new GenericBulletinBoardClientTester(client, 98354);
} }

View File

@ -44,7 +44,7 @@ public class SingleServerBulletinBoardClientIntegrationTest {
.setMinRedundancy((float) 1.0) .setMinRedundancy((float) 1.0)
.build()); .build());
clientTest = new GenericBulletinBoardClientTester(client); clientTest = new GenericBulletinBoardClientTester(client, 981541);
subscriptionTester = new GenericSubscriptionClientTester(client); subscriptionTester = new GenericSubscriptionClientTester(client);
} }

View File

@ -28,7 +28,7 @@ public class ThreadedBulletinBoardClientIntegrationTest {
public ThreadedBulletinBoardClientIntegrationTest(){ public ThreadedBulletinBoardClientIntegrationTest(){
ThreadedBulletinBoardClient client = new ThreadedBulletinBoardClient(); ThreadedBulletinBoardClient client = new ThreadedBulletinBoardClient(3,0,500);
List<String> testDB = new LinkedList<>(); List<String> testDB = new LinkedList<>();
testDB.add(BASE_URL); testDB.add(BASE_URL);
@ -38,7 +38,7 @@ public class ThreadedBulletinBoardClientIntegrationTest {
.setMinRedundancy((float) 1.0) .setMinRedundancy((float) 1.0)
.build()); .build());
clientTest = new GenericBulletinBoardClientTester(client); clientTest = new GenericBulletinBoardClientTester(client, 52351);
} }
@ -76,6 +76,7 @@ public class ThreadedBulletinBoardClientIntegrationTest {
public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException { public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException {
clientTest.testBatchPost(); clientTest.testBatchPost();
} }
@Test @Test