Removed database address from Bulletin Board Server init method (the address is given to the Query Provider).
Added integration tests for Single Server Bulletin Board Client. Fixed subscription logic in the Single Server Bulletin Board Client. Decoupled Single Server- and Simple- Bulletin Board Clients (Simple-... is now obsolete). Fixed some bugs. Threaded Bulletin Board Client now has some errors in integration.Cached-Client
parent
cc2888483d
commit
53d609bfee
|
@ -23,12 +23,16 @@ import java.util.List;
|
|||
public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClient {
|
||||
|
||||
private final AsyncBulletinBoardClient localClient;
|
||||
private AsyncBulletinBoardClient remoteClient;
|
||||
private BulletinBoardSubscriber subscriber;
|
||||
private BulletinBoardSynchronizer synchronizer;
|
||||
private final AsyncBulletinBoardClient remoteClient;
|
||||
private final AsyncBulletinBoardClient queueClient;
|
||||
private final BulletinBoardSubscriber subscriber;
|
||||
private final BulletinBoardSynchronizer synchronizer;
|
||||
|
||||
private Thread syncThread;
|
||||
|
||||
private final static int DEFAULT_WAIT_CAP = 3000;
|
||||
private final static int DEFAULT_SLEEP_INTERVAL = 3000;
|
||||
|
||||
private class SubscriptionStoreCallback implements FutureCallback<List<BulletinBoardMessage>> {
|
||||
|
||||
private final FutureCallback<List<BulletinBoardMessage>> callback;
|
||||
|
@ -81,21 +85,36 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
|
|||
* @param localClient is a Client for the local instance
|
||||
* @param remoteClient is a Client for the remote instance(s); Should have endless retries for post operations
|
||||
* @param subscriber is a subscription service to the remote instance(s)
|
||||
* @param queue is a client for a local deletable server to be used as a queue for not-yet-uploaded messages
|
||||
* @param queueClient is a client for a local deletable server to be used as a queue for not-yet-uploaded messages
|
||||
*/
|
||||
public CachedBulletinBoardClient(AsyncBulletinBoardClient localClient,
|
||||
AsyncBulletinBoardClient remoteClient,
|
||||
BulletinBoardSubscriber subscriber,
|
||||
DeletableSubscriptionBulletinBoardClient queueClient,
|
||||
int sleepInterval,
|
||||
int waitCap) {
|
||||
|
||||
this.localClient = localClient;
|
||||
this.remoteClient = remoteClient;
|
||||
this.subscriber = subscriber;
|
||||
this.queueClient = queueClient;
|
||||
|
||||
this.synchronizer = new SimpleBulletinBoardSynchronizer(sleepInterval,waitCap);
|
||||
synchronizer.init(queueClient, remoteClient);
|
||||
syncThread = new Thread(synchronizer);
|
||||
syncThread.start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a Cached Client
|
||||
* Used default values foe the time caps
|
||||
* */
|
||||
public CachedBulletinBoardClient(AsyncBulletinBoardClient localClient,
|
||||
AsyncBulletinBoardClient remoteClient,
|
||||
BulletinBoardSubscriber subscriber,
|
||||
DeletableSubscriptionBulletinBoardClient queue) {
|
||||
|
||||
this.localClient = localClient;
|
||||
this.remoteClient = remoteClient;
|
||||
this.subscriber = subscriber;
|
||||
|
||||
this.synchronizer = new SimpleBulletinBoardSynchronizer();
|
||||
synchronizer.init(queue, remoteClient);
|
||||
syncThread = new Thread(synchronizer);
|
||||
syncThread.start();
|
||||
this(localClient, remoteClient, subscriber, queue, DEFAULT_SLEEP_INTERVAL, DEFAULT_WAIT_CAP);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -385,7 +404,7 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
|
|||
}
|
||||
|
||||
@Override
|
||||
public float getRedundancy(MessageID id) {
|
||||
public float getRedundancy(MessageID id) throws CommunicationException {
|
||||
return remoteClient.getRedundancy(id);
|
||||
}
|
||||
|
||||
|
|
|
@ -68,16 +68,11 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{
|
|||
// Post message to all databases
|
||||
try {
|
||||
for (String db : meerkatDBs) {
|
||||
webTarget = client.target(db).path(BULLETIN_BOARD_SERVER_PATH).path(POST_MESSAGE_PATH);
|
||||
response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(msg, Constants.MEDIATYPE_PROTOBUF));
|
||||
|
||||
// Only consider valid responses
|
||||
if (response.getStatusInfo() == Response.Status.OK
|
||||
|| response.getStatusInfo() == Response.Status.CREATED) {
|
||||
response.readEntity(BoolValue.class).getValue();
|
||||
} else {
|
||||
throw new CommunicationException("Server returned error. Status was: " + response.getStatus());
|
||||
}
|
||||
SingleServerPostMessageWorker worker = new SingleServerPostMessageWorker(db, msg, 0);
|
||||
|
||||
worker.call();
|
||||
|
||||
}
|
||||
} catch (Exception e) { // Occurs only when server replies with valid status but invalid data
|
||||
throw new CommunicationException("Error accessing database: " + e.getMessage());
|
||||
|
|
|
@ -8,15 +8,15 @@ import com.google.protobuf.Int64Value;
|
|||
import com.google.protobuf.Timestamp;
|
||||
import meerkat.bulletinboard.workers.singleserver.*;
|
||||
import meerkat.comm.CommunicationException;
|
||||
import meerkat.protobuf.BulletinBoardAPI;
|
||||
import meerkat.crypto.concrete.SHA256Digest;
|
||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||
import meerkat.protobuf.Crypto;
|
||||
import meerkat.protobuf.Voting.BulletinBoardClientParams;
|
||||
import meerkat.util.BulletinBoardUtils;
|
||||
|
||||
import javax.ws.rs.client.Client;
|
||||
import java.lang.Iterable;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -31,11 +31,17 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
* If the list of servers contains more than one server: the server actually used is the first one
|
||||
* The class further implements a delayed access to the server after a communication error occurs
|
||||
*/
|
||||
public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient implements SubscriptionBulletinBoardClient {
|
||||
public class SingleServerBulletinBoardClient implements SubscriptionBulletinBoardClient {
|
||||
|
||||
protected Client client;
|
||||
|
||||
protected BulletinBoardDigest digest;
|
||||
|
||||
private String dbAddress;
|
||||
|
||||
private final int MAX_RETRIES = 11;
|
||||
|
||||
private ListeningScheduledExecutorService executorService;
|
||||
private final ListeningScheduledExecutorService executorService;
|
||||
|
||||
private long lastServerErrorTime;
|
||||
|
||||
|
@ -54,6 +60,43 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
|||
|
||||
}
|
||||
|
||||
private class SynchronousRetry<OUT> {
|
||||
|
||||
private final SingleServerWorker<?,OUT> worker;
|
||||
|
||||
private String thrown;
|
||||
|
||||
public SynchronousRetry(SingleServerWorker<?,OUT> worker) {
|
||||
this.worker = worker;
|
||||
this.thrown = "Could not contact server. Errors follow:\n";
|
||||
}
|
||||
|
||||
OUT run() throws CommunicationException {
|
||||
|
||||
do {
|
||||
|
||||
try {
|
||||
return worker.call();
|
||||
} catch (Exception e) {
|
||||
thrown += e.getCause() + " " + e.getMessage() + "\n";
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(failDelayInMilliseconds);
|
||||
} catch (InterruptedException e) {
|
||||
//TODO: log
|
||||
}
|
||||
|
||||
worker.decMaxRetry();
|
||||
|
||||
} while (worker.isRetry());
|
||||
|
||||
throw new CommunicationException(thrown);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This method adds a worker to the scheduled queue of the threadpool
|
||||
* If the server is in an accessible state: the job is submitted for immediate handling
|
||||
|
@ -225,7 +268,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
|||
.build())
|
||||
.build();
|
||||
|
||||
SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(meerkatDBs.get(0), batchQuery, MAX_RETRIES);
|
||||
SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(dbAddress, batchQuery, MAX_RETRIES);
|
||||
|
||||
scheduleWorker(batchWorker, new ReadBatchCallback(msg, callback));
|
||||
|
||||
|
@ -266,20 +309,28 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
|||
if (callback != null)
|
||||
callback.onSuccess(result);
|
||||
|
||||
// Remove last filter from list (MIN_ENTRY one)
|
||||
filterBuilder.removeFilter(filterBuilder.getFilterCount() - 1);
|
||||
// Update filter if needed
|
||||
|
||||
// Add updated MIN_ENTRY filter (entry number is successor of last received entry's number)
|
||||
filterBuilder.addFilter(MessageFilter.newBuilder()
|
||||
.setType(FilterType.MIN_ENTRY)
|
||||
.setEntry(result.get(result.size() - 1).getEntryNum() + 1)
|
||||
.build());
|
||||
if (result.size() > 0) {
|
||||
|
||||
// Remove last filter from list (MIN_ENTRY one)
|
||||
filterBuilder.removeFilter(filterBuilder.getFilterCount() - 1);
|
||||
|
||||
// Add updated MIN_ENTRY filter (entry number is successor of last received entry's number)
|
||||
filterBuilder.addFilter(MessageFilter.newBuilder()
|
||||
.setType(FilterType.MIN_ENTRY)
|
||||
.setEntry(result.get(result.size() - 1).getEntryNum() + 1)
|
||||
.build());
|
||||
|
||||
}
|
||||
|
||||
// Create new worker with updated task
|
||||
worker = new SingleServerReadMessagesWorker(worker.serverAddress, filterBuilder.build(), 1);
|
||||
worker = new SingleServerReadMessagesWorker(worker.serverAddress, filterBuilder.build(), MAX_RETRIES);
|
||||
|
||||
// Schedule the worker
|
||||
scheduleWorker(worker, this);
|
||||
RetryCallback<List<BulletinBoardMessage>> retryCallback = new RetryCallback<>(worker, this);
|
||||
|
||||
// Schedule the worker to run after the given interval has elapsed
|
||||
Futures.addCallback(executorService.schedule(worker, subscriptionIntervalInMilliseconds, TimeUnit.MILLISECONDS), retryCallback);
|
||||
|
||||
}
|
||||
|
||||
|
@ -324,21 +375,192 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
|||
@Override
|
||||
public void init(BulletinBoardClientParams clientParams) {
|
||||
|
||||
// Perform usual setup
|
||||
super.init(clientParams);
|
||||
this.digest = new GenericBulletinBoardDigest(new SHA256Digest());
|
||||
|
||||
// Remove all but first DB address
|
||||
String dbAddress = meerkatDBs.get(0);
|
||||
meerkatDBs = new LinkedList<>();
|
||||
meerkatDBs.add(dbAddress);
|
||||
this.dbAddress = clientParams.getBulletinBoardAddress(0);
|
||||
|
||||
}
|
||||
|
||||
// Synchronous methods
|
||||
|
||||
@Override
|
||||
public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException {
|
||||
|
||||
SingleServerPostMessageWorker worker = new SingleServerPostMessageWorker(dbAddress, msg, MAX_RETRIES);
|
||||
|
||||
SynchronousRetry<Boolean> retry = new SynchronousRetry<>(worker);
|
||||
|
||||
retry.run();
|
||||
|
||||
digest.reset();
|
||||
digest.update(msg);
|
||||
|
||||
return digest.digestAsMessageID();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getRedundancy(MessageID id) throws CommunicationException {
|
||||
|
||||
SingleServerGetRedundancyWorker worker = new SingleServerGetRedundancyWorker(dbAddress, id, MAX_RETRIES);
|
||||
|
||||
SynchronousRetry<Float> retry = new SynchronousRetry<>(worker);
|
||||
|
||||
return retry.run();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<BulletinBoardMessage> readMessages(MessageFilterList filterList) throws CommunicationException {
|
||||
|
||||
SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(dbAddress, filterList, MAX_RETRIES);
|
||||
|
||||
SynchronousRetry<List<BulletinBoardMessage>> retry = new SynchronousRetry<>(worker);
|
||||
|
||||
return retry.run();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageID postAsBatch(BulletinBoardMessage msg, int chunkSize) throws CommunicationException {
|
||||
|
||||
// Begin the batch and obtain identifier
|
||||
|
||||
BeginBatchMessage beginBatchMessage = BeginBatchMessage.newBuilder()
|
||||
.addAllTag(msg.getMsg().getTagList())
|
||||
.build();
|
||||
|
||||
SingleServerBeginBatchWorker beginBatchWorker = new SingleServerBeginBatchWorker(dbAddress, beginBatchMessage, MAX_RETRIES);
|
||||
|
||||
SynchronousRetry<Int64Value> beginRetry = new SynchronousRetry<>(beginBatchWorker);
|
||||
|
||||
Int64Value identifier = beginRetry.run();
|
||||
|
||||
// Post data chunks
|
||||
|
||||
List<BatchChunk> batchChunkList = BulletinBoardUtils.breakToBatch(msg, chunkSize);
|
||||
|
||||
BatchMessage.Builder builder = BatchMessage.newBuilder().setBatchId(identifier.getValue());
|
||||
|
||||
int position = 0;
|
||||
|
||||
for (BatchChunk data : batchChunkList) {
|
||||
|
||||
builder.setSerialNum(position).setData(data);
|
||||
|
||||
SingleServerPostBatchWorker dataWorker = new SingleServerPostBatchWorker(dbAddress, builder.build(), MAX_RETRIES);
|
||||
|
||||
SynchronousRetry<Boolean> dataRetry = new SynchronousRetry<>(dataWorker);
|
||||
|
||||
dataRetry.run();
|
||||
|
||||
// Increment position in batch
|
||||
position++;
|
||||
|
||||
}
|
||||
|
||||
// Close batch
|
||||
|
||||
CloseBatchMessage closeBatchMessage = CloseBatchMessage.newBuilder()
|
||||
.setBatchId(identifier.getValue())
|
||||
.addAllSig(msg.getSigList())
|
||||
.setTimestamp(msg.getMsg().getTimestamp())
|
||||
.setBatchLength(position)
|
||||
.build();
|
||||
|
||||
SingleServerCloseBatchWorker closeBatchWorker = new SingleServerCloseBatchWorker(dbAddress, closeBatchMessage, MAX_RETRIES);
|
||||
|
||||
SynchronousRetry<Boolean> retry = new SynchronousRetry<>(closeBatchWorker);
|
||||
|
||||
retry.run();
|
||||
|
||||
// Calculate ID and return
|
||||
|
||||
digest.reset();
|
||||
digest.update(msg);
|
||||
|
||||
return digest.digestAsMessageID();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulletinBoardMessage readMessage(MessageID msgID) throws CommunicationException {
|
||||
|
||||
// Retrieve message (which may be a stub)
|
||||
|
||||
MessageFilterList filterList = MessageFilterList.newBuilder()
|
||||
.addFilter(MessageFilter.newBuilder()
|
||||
.setType(FilterType.MSG_ID)
|
||||
.setId(msgID.getID())
|
||||
.build())
|
||||
.build();
|
||||
|
||||
SingleServerReadMessagesWorker stubWorker = new SingleServerReadMessagesWorker(dbAddress, filterList, MAX_RETRIES);
|
||||
|
||||
SynchronousRetry<List<BulletinBoardMessage>> retry = new SynchronousRetry<>(stubWorker);
|
||||
|
||||
List<BulletinBoardMessage> messages = retry.run();
|
||||
|
||||
if (messages.size() <= 0) {
|
||||
throw new CommunicationException("Could not find message in database.");
|
||||
}
|
||||
|
||||
BulletinBoardMessage msg = messages.get(0);
|
||||
|
||||
if (msg.getMsg().getDataTypeCase() != UnsignedBulletinBoardMessage.DataTypeCase.MSGID) {
|
||||
|
||||
// We retrieved a complete message. Return it.
|
||||
return msg;
|
||||
|
||||
} else {
|
||||
|
||||
// We retrieved a stub. Retrieve data.
|
||||
return readBatchData(msg);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulletinBoardMessage readBatchData(BulletinBoardMessage stub) throws CommunicationException, IllegalArgumentException {
|
||||
|
||||
BatchQuery batchQuery = BatchQuery.newBuilder()
|
||||
.setMsgID(MessageID.newBuilder()
|
||||
.setID(stub.getMsg().getMsgId())
|
||||
.build())
|
||||
.setStartPosition(0)
|
||||
.build();
|
||||
|
||||
SingleServerReadBatchWorker readBatchWorker = new SingleServerReadBatchWorker(dbAddress, batchQuery, MAX_RETRIES);
|
||||
|
||||
SynchronousRetry<List<BatchChunk>> batchRetry = new SynchronousRetry<>(readBatchWorker);
|
||||
|
||||
List<BatchChunk> batchChunkList = batchRetry.run();
|
||||
|
||||
return BulletinBoardUtils.gatherBatch(stub, batchChunkList);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException {
|
||||
|
||||
SingleServerGenerateSyncQueryWorker worker =
|
||||
new SingleServerGenerateSyncQueryWorker(dbAddress, generateSyncQueryParams, MAX_RETRIES);
|
||||
|
||||
SynchronousRetry<SyncQuery> retry = new SynchronousRetry<>(worker);
|
||||
|
||||
return retry.run();
|
||||
|
||||
}
|
||||
|
||||
// Asynchronous methods
|
||||
|
||||
@Override
|
||||
public MessageID postMessage(BulletinBoardMessage msg, FutureCallback<Boolean> callback) {
|
||||
|
||||
// Create worker with redundancy 1 and MAX_RETRIES retries
|
||||
SingleServerPostMessageWorker worker = new SingleServerPostMessageWorker(meerkatDBs.get(0), msg, MAX_RETRIES);
|
||||
SingleServerPostMessageWorker worker = new SingleServerPostMessageWorker(dbAddress, msg, MAX_RETRIES);
|
||||
|
||||
// Submit worker and create callback
|
||||
scheduleWorker(worker, new RetryCallback<>(worker, callback));
|
||||
|
@ -453,7 +675,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
|||
|
||||
// Create worker with redundancy 1 and MAX_RETRIES retries
|
||||
SingleServerBeginBatchWorker worker =
|
||||
new SingleServerBeginBatchWorker(meerkatDBs.get(0), beginBatchMessage, MAX_RETRIES);
|
||||
new SingleServerBeginBatchWorker(dbAddress, beginBatchMessage, MAX_RETRIES);
|
||||
|
||||
// Submit worker and create callback
|
||||
scheduleWorker(worker, new RetryCallback<>(worker, new BeginBatchCallback(callback)));
|
||||
|
@ -491,7 +713,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
|||
|
||||
// Create worker with redundancy 1 and MAX_RETRIES retries
|
||||
SingleServerPostBatchWorker worker =
|
||||
new SingleServerPostBatchWorker(meerkatDBs.get(0), builder.build(), MAX_RETRIES);
|
||||
new SingleServerPostBatchWorker(dbAddress, builder.build(), MAX_RETRIES);
|
||||
|
||||
// Create worker with redundancy 1 and MAX_RETRIES retries
|
||||
scheduleWorker(worker, new RetryCallback<>(worker, listCallback));
|
||||
|
@ -529,7 +751,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
|||
|
||||
// Create worker with redundancy 1 and MAX_RETRIES retries
|
||||
SingleServerCloseBatchWorker worker =
|
||||
new SingleServerCloseBatchWorker(meerkatDBs.get(0), closeBatchMessage, MAX_RETRIES);
|
||||
new SingleServerCloseBatchWorker(dbAddress, closeBatchMessage, MAX_RETRIES);
|
||||
|
||||
// Submit worker and create callback
|
||||
scheduleWorker(worker, new RetryCallback<>(worker, callback));
|
||||
|
@ -540,7 +762,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
|||
public void getRedundancy(MessageID id, FutureCallback<Float> callback) {
|
||||
|
||||
// Create worker with no retries
|
||||
SingleServerGetRedundancyWorker worker = new SingleServerGetRedundancyWorker(meerkatDBs.get(0), id, 1);
|
||||
SingleServerGetRedundancyWorker worker = new SingleServerGetRedundancyWorker(dbAddress, id, 1);
|
||||
|
||||
// Submit job and create callback
|
||||
scheduleWorker(worker, new RetryCallback<>(worker, callback));
|
||||
|
@ -551,7 +773,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
|||
public void readMessages(MessageFilterList filterList, FutureCallback<List<BulletinBoardMessage>> callback) {
|
||||
|
||||
// Create job with no retries
|
||||
SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterList, 1);
|
||||
SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(dbAddress, filterList, 1);
|
||||
|
||||
// Submit job and create callback
|
||||
scheduleWorker(worker, new RetryCallback<>(worker, callback));
|
||||
|
@ -575,7 +797,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
|||
.setStartPosition(0)
|
||||
.build();
|
||||
|
||||
SingleServerReadMessagesWorker messageWorker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterList, MAX_RETRIES);
|
||||
SingleServerReadMessagesWorker messageWorker = new SingleServerReadMessagesWorker(dbAddress, filterList, MAX_RETRIES);
|
||||
|
||||
// Submit jobs with wrapped callbacks
|
||||
scheduleWorker(messageWorker, new RetryCallback<>(messageWorker, new CompleteMessageReadCallback(callback)));
|
||||
|
@ -598,7 +820,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
|||
.setStartPosition(0)
|
||||
.build();
|
||||
|
||||
SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(meerkatDBs.get(0), batchQuery, MAX_RETRIES);
|
||||
SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(dbAddress, batchQuery, MAX_RETRIES);
|
||||
|
||||
scheduleWorker(batchWorker, new RetryCallback<>(batchWorker, new ReadBatchCallback(stub, callback)));
|
||||
|
||||
|
@ -607,7 +829,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
|||
@Override
|
||||
public void querySync(SyncQuery syncQuery, FutureCallback<SyncQueryResponse> callback) {
|
||||
|
||||
SingleServerQuerySyncWorker worker = new SingleServerQuerySyncWorker(meerkatDBs.get(0), syncQuery, MAX_RETRIES);
|
||||
SingleServerQuerySyncWorker worker = new SingleServerQuerySyncWorker(dbAddress, syncQuery, MAX_RETRIES);
|
||||
|
||||
scheduleWorker(worker, new RetryCallback<>(worker, callback));
|
||||
|
||||
|
@ -633,7 +855,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
|||
.build());
|
||||
|
||||
// Create job with no retries
|
||||
SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterListBuilder.build(), MAX_RETRIES);
|
||||
SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(dbAddress, filterListBuilder.build(), MAX_RETRIES);
|
||||
|
||||
// Submit job and create callback that retries on failure and handles repeated subscription
|
||||
scheduleWorker(worker, new RetryCallback<>(worker, new SubscriptionCallback(worker, callback)));
|
||||
|
@ -647,8 +869,6 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
|||
@Override
|
||||
public void close() {
|
||||
|
||||
super.close();
|
||||
|
||||
executorService.shutdown();
|
||||
|
||||
}
|
||||
|
|
|
@ -250,7 +250,6 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber
|
|||
super(filterList, callback);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onSuccess(List<BulletinBoardMessage> result) {
|
||||
|
||||
|
@ -274,5 +273,4 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber
|
|||
subscribe(filterList, 0, callback);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
package meerkat.bulletinboard.workers.singleserver;
|
||||
|
||||
import com.google.protobuf.Int64Value;
|
||||
import meerkat.bulletinboard.SingleServerWorker;
|
||||
import meerkat.comm.CommunicationException;
|
||||
import meerkat.protobuf.BulletinBoardAPI.SyncQuery;
|
||||
import meerkat.protobuf.BulletinBoardAPI.GenerateSyncQueryParams;
|
||||
import meerkat.rest.Constants;
|
||||
|
||||
import javax.ws.rs.ProcessingException;
|
||||
import javax.ws.rs.client.Client;
|
||||
import javax.ws.rs.client.Entity;
|
||||
import javax.ws.rs.client.WebTarget;
|
||||
import javax.ws.rs.core.Response;
|
||||
|
||||
import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH;
|
||||
import static meerkat.bulletinboard.BulletinBoardConstants.GENERATE_SYNC_QUERY_PATH;
|
||||
|
||||
/**
|
||||
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
||||
* Tries to contact server once and perform a Sync Query Generation operation
|
||||
*/
|
||||
public class SingleServerGenerateSyncQueryWorker extends SingleServerWorker<GenerateSyncQueryParams,SyncQuery> {
|
||||
|
||||
public SingleServerGenerateSyncQueryWorker(String serverAddress, GenerateSyncQueryParams payload, int maxRetry) {
|
||||
super(serverAddress, payload, maxRetry);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SyncQuery call() throws Exception {
|
||||
|
||||
Client client = clientLocal.get();
|
||||
|
||||
WebTarget webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(GENERATE_SYNC_QUERY_PATH);
|
||||
|
||||
Response response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(payload, Constants.MEDIATYPE_PROTOBUF));
|
||||
|
||||
try {
|
||||
|
||||
SyncQuery result = response.readEntity(SyncQuery.class);
|
||||
return result;
|
||||
|
||||
} catch (ProcessingException | IllegalStateException e) {
|
||||
|
||||
// Post to this server failed
|
||||
throw new CommunicationException("Could not contact the server. Original error: " + e.getMessage());
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new CommunicationException("Could not contact the server. Original error: " + e.getMessage());
|
||||
}
|
||||
finally {
|
||||
response.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -114,7 +114,7 @@ public class BulletinBoardSynchronizerTest {
|
|||
public void init() throws CommunicationException {
|
||||
|
||||
DeletableBulletinBoardServer remoteServer = new BulletinBoardSQLServer(new H2QueryProvider(REMOTE_SERVER_ADDRESS + testCount));
|
||||
remoteServer.init(REMOTE_SERVER_ADDRESS);
|
||||
remoteServer.init();
|
||||
|
||||
remoteClient = new LocalBulletinBoardClient(
|
||||
remoteServer,
|
||||
|
@ -122,7 +122,7 @@ public class BulletinBoardSynchronizerTest {
|
|||
SUBSCRIPTION_INTERVAL);
|
||||
|
||||
DeletableBulletinBoardServer localServer = new BulletinBoardSQLServer(new H2QueryProvider(LOCAL_SERVER_ADDRESS + testCount));
|
||||
localServer.init(LOCAL_SERVER_ADDRESS);
|
||||
localServer.init();
|
||||
|
||||
localClient = new LocalBulletinBoardClient(
|
||||
localServer,
|
||||
|
|
|
@ -0,0 +1,111 @@
|
|||
package meerkat.bulletinboard;
|
||||
|
||||
import meerkat.bulletinboard.sqlserver.BulletinBoardSQLServer;
|
||||
import meerkat.bulletinboard.sqlserver.H2QueryProvider;
|
||||
import meerkat.comm.CommunicationException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.security.SignatureException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Created by Arbel on 6/27/2016.
|
||||
*/
|
||||
public class CachedBulletinBoardClientTest {
|
||||
|
||||
private static final int THREAD_NUM = 3;
|
||||
|
||||
private static final String LOCAL_DB_NAME = "localDB";
|
||||
private static final String REMOTE_DB_NAME = "remoteDB";
|
||||
private static final String QUEUE_DB_NAME = "queueDB";
|
||||
|
||||
private static final int SUBSRCIPTION_DELAY = 500;
|
||||
private static final int SYNC_DELAY = 500;
|
||||
|
||||
// Testers
|
||||
private CachedBulletinBoardClient cachedClient;
|
||||
private GenericBulletinBoardClientTester clientTest;
|
||||
private GenericSubscriptionClientTester subscriptionTester;
|
||||
|
||||
public CachedBulletinBoardClientTest() throws CommunicationException {
|
||||
|
||||
DeletableBulletinBoardServer localServer = new BulletinBoardSQLServer(new H2QueryProvider(LOCAL_DB_NAME));
|
||||
localServer.init();
|
||||
LocalBulletinBoardClient localClient = new LocalBulletinBoardClient(localServer, THREAD_NUM, SUBSRCIPTION_DELAY);
|
||||
|
||||
DeletableBulletinBoardServer remoteServer = new BulletinBoardSQLServer(new H2QueryProvider(REMOTE_DB_NAME));
|
||||
remoteServer.init();
|
||||
LocalBulletinBoardClient remoteClient = new LocalBulletinBoardClient(remoteServer, THREAD_NUM, SUBSRCIPTION_DELAY);
|
||||
|
||||
DeletableBulletinBoardServer queueServer = new BulletinBoardSQLServer(new H2QueryProvider(QUEUE_DB_NAME));
|
||||
queueServer.init();
|
||||
LocalBulletinBoardClient queueClient = new LocalBulletinBoardClient(queueServer, THREAD_NUM, SUBSRCIPTION_DELAY);
|
||||
|
||||
List<SubscriptionBulletinBoardClient> clientList = new LinkedList<>();
|
||||
clientList.add(remoteClient);
|
||||
|
||||
BulletinBoardSubscriber subscriber = new ThreadedBulletinBoardSubscriber(clientList, localClient);
|
||||
|
||||
cachedClient = new CachedBulletinBoardClient(localClient, remoteClient, subscriber, queueClient, SYNC_DELAY, SYNC_DELAY);
|
||||
subscriptionTester = new GenericSubscriptionClientTester(cachedClient);
|
||||
clientTest = new GenericBulletinBoardClientTester(cachedClient);
|
||||
|
||||
}
|
||||
|
||||
// Test methods
|
||||
|
||||
/**
|
||||
* Takes care of initializing the client and the test resources
|
||||
*/
|
||||
@Before
|
||||
public void init(){
|
||||
|
||||
clientTest.init();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the client and makes sure the test fails when an exception occurred in a separate thread
|
||||
*/
|
||||
|
||||
@After
|
||||
public void close() {
|
||||
|
||||
cachedClient.close();
|
||||
clientTest.close();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPost() {
|
||||
|
||||
clientTest.testPost();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException {
|
||||
|
||||
clientTest.testBatchPost();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompleteBatchPost() throws CommunicationException, SignatureException, InterruptedException {
|
||||
|
||||
clientTest.testCompleteBatchPost();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubscription() throws SignatureException, CommunicationException {
|
||||
|
||||
// subscriptionTester.init();
|
||||
// subscriptionTester.subscriptionTest();
|
||||
// subscriptionTester.close();
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -178,7 +178,7 @@ public class GenericSubscriptionClientTester {
|
|||
public void onFailure(Throwable t) {
|
||||
System.err.println(t.getCause() + " " + t.getMessage());
|
||||
thrown.add(t);
|
||||
jobSemaphore.release(expectedMessages.size());
|
||||
jobSemaphore.release();
|
||||
stage = expectedMessages.size();
|
||||
}
|
||||
}
|
||||
|
@ -202,9 +202,9 @@ public class GenericSubscriptionClientTester {
|
|||
.build();
|
||||
|
||||
List<List<BulletinBoardMessage>> expectedMessages = new ArrayList<>(3);
|
||||
expectedMessages.add(new LinkedList<BulletinBoardMessage>());
|
||||
expectedMessages.add(new LinkedList<BulletinBoardMessage>());
|
||||
expectedMessages.add(new LinkedList<BulletinBoardMessage>());
|
||||
expectedMessages.add(new LinkedList<>());
|
||||
expectedMessages.add(new LinkedList<>());
|
||||
expectedMessages.add(new LinkedList<>());
|
||||
expectedMessages.get(0).add(msg1);
|
||||
expectedMessages.get(2).add(msg3);
|
||||
|
||||
|
|
|
@ -7,12 +7,6 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.security.SignatureException;
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
* Created by Arbel Deutsch Peled on 05-Dec-15.
|
||||
|
@ -30,26 +24,10 @@ public class LocalBulletinBoardClientTest {
|
|||
|
||||
public LocalBulletinBoardClientTest() throws CommunicationException {
|
||||
|
||||
H2QueryProvider queryProvider = new H2QueryProvider(DB_NAME) ;
|
||||
|
||||
try {
|
||||
|
||||
Connection conn = queryProvider.getDataSource().getConnection();
|
||||
Statement stmt = conn.createStatement();
|
||||
|
||||
List<String> deletionQueries = queryProvider.getSchemaDeletionCommands();
|
||||
|
||||
for (String deletionQuery : deletionQueries) {
|
||||
stmt.execute(deletionQuery);
|
||||
}
|
||||
|
||||
} catch (SQLException e) {
|
||||
System.err.println(e.getMessage());
|
||||
throw new CommunicationException(e.getCause() + " " + e.getMessage());
|
||||
}
|
||||
H2QueryProvider queryProvider = new H2QueryProvider(DB_NAME);
|
||||
|
||||
DeletableBulletinBoardServer server = new BulletinBoardSQLServer(queryProvider);
|
||||
server.init(DB_NAME);
|
||||
server.init();
|
||||
|
||||
LocalBulletinBoardClient client = new LocalBulletinBoardClient(server, THREAD_NUM, SUBSRCIPTION_DELAY);
|
||||
subscriptionTester = new GenericSubscriptionClientTester(client);
|
||||
|
@ -102,6 +80,7 @@ public class LocalBulletinBoardClientTest {
|
|||
|
||||
@Test
|
||||
public void testSubscription() throws SignatureException, CommunicationException {
|
||||
|
||||
subscriptionTester.init();
|
||||
subscriptionTester.subscriptionTest();
|
||||
subscriptionTester.close();
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
package meerkat.bulletinboard;
|
||||
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import meerkat.comm.CommunicationException;
|
||||
import meerkat.protobuf.Voting.BulletinBoardClientParams;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.security.SignatureException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/**
|
||||
* Created by Arbel Deutsch Peled on 05-Dec-15.
|
||||
*/
|
||||
public class SingleServerBulletinBoardClientIntegrationTest {
|
||||
|
||||
// Server data
|
||||
|
||||
private static final String PROP_GETTY_URL = "gretty.httpBaseURI";
|
||||
private static final String DEFAULT_BASE_URL = "http://localhost:8081";
|
||||
private static final String BASE_URL = System.getProperty(PROP_GETTY_URL, DEFAULT_BASE_URL);
|
||||
|
||||
private static final int THREAD_NUM = 3;
|
||||
private static final long FAIL_DELAY = 3000;
|
||||
private static final long SUBSCRIPTION_INTERVAL = 500;
|
||||
|
||||
// Testers
|
||||
private GenericBulletinBoardClientTester clientTest;
|
||||
private GenericSubscriptionClientTester subscriptionTester;
|
||||
|
||||
public SingleServerBulletinBoardClientIntegrationTest(){
|
||||
|
||||
SingleServerBulletinBoardClient client = new SingleServerBulletinBoardClient(THREAD_NUM, FAIL_DELAY, SUBSCRIPTION_INTERVAL);
|
||||
|
||||
List<String> testDB = new LinkedList<>();
|
||||
testDB.add(BASE_URL);
|
||||
|
||||
client.init(BulletinBoardClientParams.newBuilder()
|
||||
.addAllBulletinBoardAddress(testDB)
|
||||
.setMinRedundancy((float) 1.0)
|
||||
.build());
|
||||
|
||||
clientTest = new GenericBulletinBoardClientTester(client);
|
||||
subscriptionTester = new GenericSubscriptionClientTester(client);
|
||||
|
||||
}
|
||||
|
||||
// Test methods
|
||||
|
||||
/**
|
||||
* Takes care of initializing the client and the test resources
|
||||
*/
|
||||
@Before
|
||||
public void init(){
|
||||
|
||||
clientTest.init();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the client and makes sure the test fails when an exception occurred in a separate thread
|
||||
*/
|
||||
|
||||
@After
|
||||
public void close() {
|
||||
|
||||
clientTest.close();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPost() {
|
||||
|
||||
clientTest.testPost();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException {
|
||||
|
||||
clientTest.testBatchPost();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompleteBatchPost() throws CommunicationException, SignatureException, InterruptedException {
|
||||
|
||||
clientTest.testCompleteBatchPost();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubscription() throws SignatureException, CommunicationException {
|
||||
|
||||
subscriptionTester.init();
|
||||
subscriptionTester.subscriptionTest();
|
||||
subscriptionTester.close();
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -368,7 +368,7 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{
|
|||
* This method initializes the signatures, connects to the DB and creates the schema (if required).
|
||||
*/
|
||||
@Override
|
||||
public void init(String meerkatDB) throws CommunicationException {
|
||||
public void init() throws CommunicationException {
|
||||
// TODO write signature reading part.
|
||||
|
||||
digest = new GenericBulletinBoardDigest(new SHA256Digest());
|
||||
|
|
|
@ -44,14 +44,6 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL
|
|||
bulletinBoard = (BulletinBoardServer) servletContext.getAttribute(BULLETIN_BOARD_ATTRIBUTE_NAME);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the BulletinBoard init method.
|
||||
*/
|
||||
@Override
|
||||
public void init(String meerkatDB) throws CommunicationException {
|
||||
bulletinBoard.init(meerkatDB);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void contextInitialized(ServletContextEvent servletContextEvent) {
|
||||
ServletContext servletContext = servletContextEvent.getServletContext();
|
||||
|
@ -77,7 +69,7 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL
|
|||
}
|
||||
|
||||
try {
|
||||
init(dbName);
|
||||
bulletinBoard.init();
|
||||
servletContext.setAttribute(BULLETIN_BOARD_ATTRIBUTE_NAME, bulletinBoard);
|
||||
} catch (CommunicationException e) {
|
||||
System.err.println(e.getMessage());
|
||||
|
|
|
@ -54,7 +54,7 @@ public class H2BulletinBoardServerTest {
|
|||
|
||||
BulletinBoardServer bulletinBoardServer = new BulletinBoardSQLServer(queryProvider);
|
||||
try {
|
||||
bulletinBoardServer.init("");
|
||||
bulletinBoardServer.init();
|
||||
|
||||
} catch (CommunicationException e) {
|
||||
System.err.println(e.getMessage());
|
||||
|
|
|
@ -58,7 +58,7 @@ public class MySQLBulletinBoardServerTest {
|
|||
|
||||
BulletinBoardServer bulletinBoardServer = new BulletinBoardSQLServer(queryProvider);
|
||||
try {
|
||||
bulletinBoardServer.init("");
|
||||
bulletinBoardServer.init();
|
||||
|
||||
} catch (CommunicationException e) {
|
||||
System.err.println(e.getMessage());
|
||||
|
|
|
@ -39,7 +39,7 @@ public class SQLiteBulletinBoardServerTest{
|
|||
|
||||
BulletinBoardServer bulletinBoardServer = new BulletinBoardSQLServer(new SQLiteQueryProvider(testFilename));
|
||||
try {
|
||||
bulletinBoardServer.init("");
|
||||
bulletinBoardServer.init();
|
||||
|
||||
} catch (CommunicationException e) {
|
||||
System.err.println(e.getMessage());
|
||||
|
|
|
@ -31,8 +31,9 @@ public interface BulletinBoardClient {
|
|||
* Check how "safe" a given message is in a synchronous manner
|
||||
* @param id is the unique message identifier for retrieval
|
||||
* @return a normalized "redundancy score" from 0 (local only) to 1 (fully published)
|
||||
* @throws CommunicationException
|
||||
*/
|
||||
float getRedundancy(MessageID id);
|
||||
float getRedundancy(MessageID id) throws CommunicationException;
|
||||
|
||||
/**
|
||||
* Read all messages posted matching the given filter in a synchronous manner
|
||||
|
|
|
@ -22,7 +22,7 @@ public interface BulletinBoardServer{
|
|||
* It also establishes the connection to the DB
|
||||
* @throws CommunicationException on DB connection error
|
||||
*/
|
||||
public void init(String meerkatDB) throws CommunicationException;
|
||||
public void init() throws CommunicationException;
|
||||
|
||||
/**
|
||||
* Post a message to bulletin board.
|
||||
|
|
Loading…
Reference in New Issue