Working Integration test for Threaded BB Client supporting Batches.
Haven't tested subscriptions yet.Bulletin-Board-Batch
parent
3fed32f9e6
commit
9a78330e29
|
@ -2,7 +2,7 @@ package meerkat.bulletinboard;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.FutureCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
|
|
||||||
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -16,7 +16,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
* This is a general class for handling multi-server work
|
* This is a general class for handling multi-server work
|
||||||
* It utilizes Single Server Clients to perform the actual per-server work
|
* It utilizes Single Server Clients to perform the actual per-server work
|
||||||
*/
|
*/
|
||||||
public abstract class MultiServerWorker<IN, OUT> extends BulletinClientWorker<IN> implements Runnable, ClientCallback<OUT>{
|
public abstract class MultiServerWorker<IN, OUT> extends BulletinClientWorker<IN> implements Runnable, FutureCallback<OUT>{
|
||||||
|
|
||||||
private List<SingleServerBulletinBoardClient> clients;
|
private List<SingleServerBulletinBoardClient> clients;
|
||||||
|
|
||||||
|
@ -26,7 +26,7 @@ public abstract class MultiServerWorker<IN, OUT> extends BulletinClientWorker<IN
|
||||||
|
|
||||||
private AtomicBoolean returnedResult;
|
private AtomicBoolean returnedResult;
|
||||||
|
|
||||||
private ClientCallback<OUT> clientCallback;
|
private FutureCallback<OUT> futureCallback;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
|
@ -35,11 +35,11 @@ public abstract class MultiServerWorker<IN, OUT> extends BulletinClientWorker<IN
|
||||||
* @param minServers is the minimal amount of servers needed in order to successfully complete the job
|
* @param minServers is the minimal amount of servers needed in order to successfully complete the job
|
||||||
* @param payload is the payload for the job
|
* @param payload is the payload for the job
|
||||||
* @param maxRetry is the maximal per-server retry count
|
* @param maxRetry is the maximal per-server retry count
|
||||||
* @param clientCallback contains the callback methods used to report the result back to the client
|
* @param futureCallback contains the callback methods used to report the result back to the client
|
||||||
*/
|
*/
|
||||||
public MultiServerWorker(List<SingleServerBulletinBoardClient> clients, boolean shuffleClients,
|
public MultiServerWorker(List<SingleServerBulletinBoardClient> clients, boolean shuffleClients,
|
||||||
int minServers, IN payload, int maxRetry,
|
int minServers, IN payload, int maxRetry,
|
||||||
ClientCallback<OUT> clientCallback) {
|
FutureCallback<OUT> futureCallback) {
|
||||||
|
|
||||||
super(payload,maxRetry);
|
super(payload,maxRetry);
|
||||||
|
|
||||||
|
@ -50,7 +50,7 @@ public abstract class MultiServerWorker<IN, OUT> extends BulletinClientWorker<IN
|
||||||
|
|
||||||
this.minServers = new AtomicInteger(minServers);
|
this.minServers = new AtomicInteger(minServers);
|
||||||
maxFailedServers = new AtomicInteger(clients.size() - minServers);
|
maxFailedServers = new AtomicInteger(clients.size() - minServers);
|
||||||
this.clientCallback = clientCallback;
|
this.futureCallback = futureCallback;
|
||||||
|
|
||||||
returnedResult = new AtomicBoolean(false);
|
returnedResult = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
@ -61,9 +61,9 @@ public abstract class MultiServerWorker<IN, OUT> extends BulletinClientWorker<IN
|
||||||
*/
|
*/
|
||||||
public MultiServerWorker(List<SingleServerBulletinBoardClient> clients,
|
public MultiServerWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
int minServers, IN payload, int maxRetry,
|
int minServers, IN payload, int maxRetry,
|
||||||
ClientCallback<OUT> clientCallback) {
|
FutureCallback<OUT> futureCallback) {
|
||||||
|
|
||||||
this(clients, false, minServers, payload, maxRetry, clientCallback);
|
this(clients, false, minServers, payload, maxRetry, futureCallback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,7 +74,7 @@ public abstract class MultiServerWorker<IN, OUT> extends BulletinClientWorker<IN
|
||||||
*/
|
*/
|
||||||
protected void succeed(OUT result){
|
protected void succeed(OUT result){
|
||||||
if (returnedResult.compareAndSet(false, true)) {
|
if (returnedResult.compareAndSet(false, true)) {
|
||||||
clientCallback.handleCallback(result);
|
futureCallback.onSuccess(result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,7 +85,7 @@ public abstract class MultiServerWorker<IN, OUT> extends BulletinClientWorker<IN
|
||||||
*/
|
*/
|
||||||
protected void fail(Throwable t){
|
protected void fail(Throwable t){
|
||||||
if (returnedResult.compareAndSet(false, true)) {
|
if (returnedResult.compareAndSet(false, true)) {
|
||||||
clientCallback.handleFailure(t);
|
futureCallback.onFailure(t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,11 +8,16 @@ import com.google.protobuf.ByteString;
|
||||||
import meerkat.bulletinboard.workers.singleserver.*;
|
import meerkat.bulletinboard.workers.singleserver.*;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
import meerkat.protobuf.Voting.BulletinBoardClientParams;
|
import meerkat.protobuf.Voting.BulletinBoardClientParams;
|
||||||
|
import meerkat.util.BulletinBoardUtils;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by Arbel Deutsch Peled on 28-Dec-15.
|
* Created by Arbel Deutsch Peled on 28-Dec-15.
|
||||||
|
@ -24,7 +29,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
*/
|
*/
|
||||||
public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient implements AsyncBulletinBoardClient {
|
public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient implements AsyncBulletinBoardClient {
|
||||||
|
|
||||||
private final int MAX_RETRIES = 10;
|
private final int MAX_RETRIES = 11;
|
||||||
|
|
||||||
protected ListeningScheduledExecutorService executorService;
|
protected ListeningScheduledExecutorService executorService;
|
||||||
|
|
||||||
|
@ -34,6 +39,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
|
|
||||||
protected final long failDelayInMilliseconds;
|
protected final long failDelayInMilliseconds;
|
||||||
|
|
||||||
|
protected final long subscriptionIntervalInMilliseconds;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notify the client that a job has failed
|
* Notify the client that a job has failed
|
||||||
* This makes new scheduled jobs be scheduled for a later time (after the given delay)
|
* This makes new scheduled jobs be scheduled for a later time (after the given delay)
|
||||||
|
@ -80,16 +87,16 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
class RetryCallback<T> implements FutureCallback<T> {
|
class RetryCallback<T> implements FutureCallback<T> {
|
||||||
|
|
||||||
private SingleServerWorker worker;
|
private SingleServerWorker worker;
|
||||||
private ClientCallback<T> clientCallback;
|
private FutureCallback<T> futureCallback;
|
||||||
|
|
||||||
public RetryCallback(SingleServerWorker worker, ClientCallback<T> clientCallback) {
|
public RetryCallback(SingleServerWorker worker, FutureCallback<T> futureCallback) {
|
||||||
this.worker = worker;
|
this.worker = worker;
|
||||||
this.clientCallback = clientCallback;
|
this.futureCallback = futureCallback;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(T result) {
|
public void onSuccess(T result) {
|
||||||
clientCallback.handleCallback(result);
|
futureCallback.onSuccess(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -107,19 +114,210 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
scheduleWorker(worker, this);
|
scheduleWorker(worker, this);
|
||||||
} else {
|
} else {
|
||||||
// No more retries: notify caller about failure
|
// No more retries: notify caller about failure
|
||||||
clientCallback.handleFailure(t);
|
futureCallback.onFailure(t);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This callback ties together all the per-batch-data callbacks into a single callback
|
||||||
|
* It reports success back to the user only if all of the batch-data were successfully posted
|
||||||
|
* If any batch-data fails to post: this callback reports failure
|
||||||
|
*/
|
||||||
|
class PostBatchDataListCallback implements FutureCallback<Boolean> {
|
||||||
|
|
||||||
public SingleServerBulletinBoardClient(int threadPoolSize, long failDelayInMilliseconds) {
|
private FutureCallback<Boolean> callback;
|
||||||
|
private AtomicInteger batchDataRemaining;
|
||||||
|
private AtomicBoolean aggregatedResult;
|
||||||
|
|
||||||
|
public PostBatchDataListCallback(int batchDataLength, FutureCallback<Boolean> callback) {
|
||||||
|
|
||||||
|
this.callback = callback;
|
||||||
|
this.batchDataRemaining = new AtomicInteger(batchDataLength);
|
||||||
|
this.aggregatedResult = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Boolean result) {
|
||||||
|
|
||||||
|
if (result){
|
||||||
|
this.aggregatedResult.set(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (batchDataRemaining.decrementAndGet() == 0){
|
||||||
|
callback.onSuccess(this.aggregatedResult.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
|
||||||
|
// Notify caller about failure
|
||||||
|
callback.onFailure(t);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This callback ties together the different parts of a CompleteBatch as they arrive from the server
|
||||||
|
* It assembles a CompleteBatch from the parts and sends it to the user if all parts arrived
|
||||||
|
* If any part fails to arrive: it invokes the onFailure method
|
||||||
|
*/
|
||||||
|
class CompleteBatchReadCallback {
|
||||||
|
|
||||||
|
private FutureCallback callback;
|
||||||
|
|
||||||
|
private List<BatchData> batchDataList;
|
||||||
|
private BulletinBoardMessage batchMessage;
|
||||||
|
|
||||||
|
private AtomicInteger remainingQueries;
|
||||||
|
private AtomicBoolean failed;
|
||||||
|
|
||||||
|
public CompleteBatchReadCallback(FutureCallback callback) {
|
||||||
|
|
||||||
|
this.callback = callback;
|
||||||
|
|
||||||
|
remainingQueries = new AtomicInteger(2);
|
||||||
|
failed = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void combineAndReturn() {
|
||||||
|
|
||||||
|
final String[] prefixes = {
|
||||||
|
BulletinBoardConstants.BATCH_ID_TAG_PREFIX,
|
||||||
|
BulletinBoardConstants.BATCH_TAG};
|
||||||
|
|
||||||
|
if (remainingQueries.decrementAndGet() == 0){
|
||||||
|
|
||||||
|
BeginBatchMessage beginBatchMessage =
|
||||||
|
BeginBatchMessage.newBuilder()
|
||||||
|
.setSignerId(batchMessage.getSig(0).getSignerId())
|
||||||
|
.setBatchId(Integer.parseInt(
|
||||||
|
BulletinBoardUtils.findTagWithPrefix(batchMessage, BulletinBoardConstants.BATCH_ID_TAG_PREFIX)))
|
||||||
|
.addAllTag(BulletinBoardUtils.removePrefixTags(batchMessage, Arrays.asList(prefixes)))
|
||||||
|
.build();
|
||||||
|
callback.onSuccess(new CompleteBatch(beginBatchMessage, batchDataList, batchMessage.getSig(0)));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void fail(Throwable t) {
|
||||||
|
if (failed.compareAndSet(false, true)) {
|
||||||
|
callback.onFailure(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return a FutureCallback for the Batch Data List that ties to this object
|
||||||
|
*/
|
||||||
|
public FutureCallback<List<BatchData>> asBatchDataListFutureCallback() {
|
||||||
|
return new FutureCallback<List<BatchData>>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(List<BatchData> result) {
|
||||||
|
batchDataList = result;
|
||||||
|
|
||||||
|
combineAndReturn();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
fail(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return a FutureCallback for the Bulletin Board Message that ties to this object
|
||||||
|
*/
|
||||||
|
public FutureCallback<List<BulletinBoardMessage>> asBulletinBoardMessageListFutureCallback() {
|
||||||
|
return new FutureCallback<List<BulletinBoardMessage>>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(List<BulletinBoardMessage> result) {
|
||||||
|
if (result.size() < 1){
|
||||||
|
onFailure(new IllegalArgumentException("Server returned empty message list"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
batchMessage = result.get(0);
|
||||||
|
|
||||||
|
combineAndReturn();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
fail(t);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Inner class for handling returned values of subscription operations
|
||||||
|
* This class's methods also ensure continued operation of the subscription
|
||||||
|
*/
|
||||||
|
class SubscriptionCallback implements FutureCallback<List<BulletinBoardMessage>> {
|
||||||
|
|
||||||
|
private SingleServerReadMessagesWorker worker;
|
||||||
|
private MessageHandler messageHandler;
|
||||||
|
|
||||||
|
private MessageFilterList.Builder filterBuilder;
|
||||||
|
|
||||||
|
public SubscriptionCallback(SingleServerReadMessagesWorker worker, MessageHandler messageHandler) {
|
||||||
|
this.worker = worker;
|
||||||
|
this.messageHandler = messageHandler;
|
||||||
|
filterBuilder = worker.getPayload().toBuilder();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(List<BulletinBoardMessage> result) {
|
||||||
|
|
||||||
|
// Report new messages to user
|
||||||
|
messageHandler.handleNewMessages(result);
|
||||||
|
|
||||||
|
// Remove last filter from list (MIN_ENTRY one)
|
||||||
|
filterBuilder.removeFilter(filterBuilder.getFilterCount() - 1);
|
||||||
|
|
||||||
|
// Add updated MIN_ENTRY filter (entry number is successor of last received entry's number)
|
||||||
|
filterBuilder.addFilter(MessageFilter.newBuilder()
|
||||||
|
.setType(FilterType.MIN_ENTRY)
|
||||||
|
.setEntry(result.get(result.size() - 1).getEntryNum() + 1)
|
||||||
|
.build());
|
||||||
|
|
||||||
|
// Create new worker with updated task
|
||||||
|
worker = new SingleServerReadMessagesWorker(worker.serverAddress, filterBuilder.build(), 1);
|
||||||
|
|
||||||
|
// Schedule the worker
|
||||||
|
scheduleWorker(worker, this);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
|
||||||
|
// Notify client about failure
|
||||||
|
fail();
|
||||||
|
|
||||||
|
// Reschedule exact same task
|
||||||
|
scheduleWorker(worker, this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public SingleServerBulletinBoardClient(int threadPoolSize, long failDelayInMilliseconds, long subscriptionIntervalInMilliseconds) {
|
||||||
|
|
||||||
executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadPoolSize));
|
executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadPoolSize));
|
||||||
|
|
||||||
this.failDelayInMilliseconds = failDelayInMilliseconds;
|
this.failDelayInMilliseconds = failDelayInMilliseconds;
|
||||||
|
this.subscriptionIntervalInMilliseconds = subscriptionIntervalInMilliseconds;
|
||||||
|
|
||||||
// Set server error time to a time sufficiently in the past to make new jobs go through
|
// Set server error time to a time sufficiently in the past to make new jobs go through
|
||||||
lastServerErrorTime = System.currentTimeMillis() - failDelayInMilliseconds;
|
lastServerErrorTime = System.currentTimeMillis() - failDelayInMilliseconds;
|
||||||
|
@ -147,7 +345,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MessageID postMessage(BulletinBoardMessage msg, ClientCallback<Boolean> callback) {
|
public MessageID postMessage(BulletinBoardMessage msg, FutureCallback<Boolean> callback) {
|
||||||
|
|
||||||
// Create worker with redundancy 1 and MAX_RETRIES retries
|
// Create worker with redundancy 1 and MAX_RETRIES retries
|
||||||
SingleServerPostMessageWorker worker = new SingleServerPostMessageWorker(meerkatDBs.get(0), msg, MAX_RETRIES);
|
SingleServerPostMessageWorker worker = new SingleServerPostMessageWorker(meerkatDBs.get(0), msg, MAX_RETRIES);
|
||||||
|
@ -162,18 +360,18 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private class PostBatchDataCallback implements ClientCallback<Boolean> {
|
private class PostBatchDataCallback implements FutureCallback<Boolean> {
|
||||||
|
|
||||||
private CompleteBatch completeBatch;
|
private CompleteBatch completeBatch;
|
||||||
ClientCallback<Boolean> callback;
|
FutureCallback<Boolean> callback;
|
||||||
|
|
||||||
public PostBatchDataCallback(CompleteBatch completeBatch, ClientCallback<Boolean> callback) {
|
public PostBatchDataCallback(CompleteBatch completeBatch, FutureCallback<Boolean> callback) {
|
||||||
this.completeBatch = completeBatch;
|
this.completeBatch = completeBatch;
|
||||||
this.callback = callback;
|
this.callback = callback;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleCallback(Boolean msg) {
|
public void onSuccess(Boolean msg) {
|
||||||
closeBatch(
|
closeBatch(
|
||||||
CloseBatchMessage.newBuilder()
|
CloseBatchMessage.newBuilder()
|
||||||
.setBatchId(completeBatch.getBeginBatchMessage().getBatchId())
|
.setBatchId(completeBatch.getBeginBatchMessage().getBatchId())
|
||||||
|
@ -185,24 +383,24 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
callback.handleFailure(t);
|
callback.onFailure(t);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private class BeginBatchCallback implements ClientCallback<Boolean> {
|
private class BeginBatchCallback implements FutureCallback<Boolean> {
|
||||||
|
|
||||||
private CompleteBatch completeBatch;
|
private CompleteBatch completeBatch;
|
||||||
ClientCallback<Boolean> callback;
|
FutureCallback<Boolean> callback;
|
||||||
|
|
||||||
public BeginBatchCallback(CompleteBatch completeBatch, ClientCallback<Boolean> callback) {
|
public BeginBatchCallback(CompleteBatch completeBatch, FutureCallback<Boolean> callback) {
|
||||||
this.completeBatch = completeBatch;
|
this.completeBatch = completeBatch;
|
||||||
this.callback = callback;
|
this.callback = callback;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleCallback(Boolean msg) {
|
public void onSuccess(Boolean msg) {
|
||||||
|
|
||||||
postBatchData(
|
postBatchData(
|
||||||
completeBatch.getBeginBatchMessage().getSignerId(),
|
completeBatch.getBeginBatchMessage().getSignerId(),
|
||||||
|
@ -213,13 +411,13 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
callback.handleFailure(t);
|
callback.onFailure(t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MessageID postBatch(CompleteBatch completeBatch, ClientCallback<Boolean> callback) {
|
public MessageID postBatch(CompleteBatch completeBatch, FutureCallback<Boolean> callback) {
|
||||||
|
|
||||||
beginBatch(
|
beginBatch(
|
||||||
completeBatch.getBeginBatchMessage(),
|
completeBatch.getBeginBatchMessage(),
|
||||||
|
@ -233,7 +431,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void beginBatch(BeginBatchMessage beginBatchMessage, ClientCallback<Boolean> callback) {
|
public void beginBatch(BeginBatchMessage beginBatchMessage, FutureCallback<Boolean> callback) {
|
||||||
|
|
||||||
// Create worker with redundancy 1 and MAX_RETRIES retries
|
// Create worker with redundancy 1 and MAX_RETRIES retries
|
||||||
SingleServerBeginBatchWorker worker =
|
SingleServerBeginBatchWorker worker =
|
||||||
|
@ -246,12 +444,16 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList,
|
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList,
|
||||||
int startPosition, ClientCallback<Boolean> callback) {
|
int startPosition, FutureCallback<Boolean> callback) {
|
||||||
|
|
||||||
BatchMessage.Builder builder = BatchMessage.newBuilder()
|
BatchMessage.Builder builder = BatchMessage.newBuilder()
|
||||||
.setSignerId(signerId)
|
.setSignerId(signerId)
|
||||||
.setBatchId(batchId);
|
.setBatchId(batchId);
|
||||||
|
|
||||||
|
// Create a unified callback to aggregate successful posts
|
||||||
|
|
||||||
|
PostBatchDataListCallback listCallback = new PostBatchDataListCallback(batchDataList.size(), callback);
|
||||||
|
|
||||||
// Iterate through data list
|
// Iterate through data list
|
||||||
|
|
||||||
for (BatchData data : batchDataList) {
|
for (BatchData data : batchDataList) {
|
||||||
|
@ -262,7 +464,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
new SingleServerPostBatchWorker(meerkatDBs.get(0), builder.build(), MAX_RETRIES);
|
new SingleServerPostBatchWorker(meerkatDBs.get(0), builder.build(), MAX_RETRIES);
|
||||||
|
|
||||||
// Create worker with redundancy 1 and MAX_RETRIES retries
|
// Create worker with redundancy 1 and MAX_RETRIES retries
|
||||||
scheduleWorker(worker, new RetryCallback(worker, callback));
|
scheduleWorker(worker, new RetryCallback(worker, listCallback));
|
||||||
|
|
||||||
// Increment position in batch
|
// Increment position in batch
|
||||||
startPosition++;
|
startPosition++;
|
||||||
|
@ -271,7 +473,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList, ClientCallback<Boolean> callback) {
|
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList, FutureCallback<Boolean> callback) {
|
||||||
|
|
||||||
postBatchData(signerId, batchId, batchDataList, 0, callback);
|
postBatchData(signerId, batchId, batchDataList, 0, callback);
|
||||||
|
|
||||||
|
@ -279,21 +481,21 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList,
|
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList,
|
||||||
int startPosition, ClientCallback<Boolean> callback) {
|
int startPosition, FutureCallback<Boolean> callback) {
|
||||||
|
|
||||||
postBatchData(ByteString.copyFrom(signerId), batchId, batchDataList, startPosition, callback);
|
postBatchData(ByteString.copyFrom(signerId), batchId, batchDataList, startPosition, callback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, ClientCallback<Boolean> callback) {
|
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, FutureCallback<Boolean> callback) {
|
||||||
|
|
||||||
postBatchData(signerId, batchId, batchDataList, 0, callback);
|
postBatchData(signerId, batchId, batchDataList, 0, callback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback<Boolean> callback) {
|
public void closeBatch(CloseBatchMessage closeBatchMessage, FutureCallback<Boolean> callback) {
|
||||||
|
|
||||||
// Create worker with redundancy 1 and MAX_RETRIES retries
|
// Create worker with redundancy 1 and MAX_RETRIES retries
|
||||||
SingleServerCloseBatchWorker worker =
|
SingleServerCloseBatchWorker worker =
|
||||||
|
@ -305,7 +507,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void getRedundancy(MessageID id, ClientCallback<Float> callback) {
|
public void getRedundancy(MessageID id, FutureCallback<Float> callback) {
|
||||||
|
|
||||||
// Create worker with no retries
|
// Create worker with no retries
|
||||||
SingleServerGetRedundancyWorker worker = new SingleServerGetRedundancyWorker(meerkatDBs.get(0), id, 1);
|
SingleServerGetRedundancyWorker worker = new SingleServerGetRedundancyWorker(meerkatDBs.get(0), id, 1);
|
||||||
|
@ -316,7 +518,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readMessages(MessageFilterList filterList, ClientCallback<List<BulletinBoardMessage>> callback) {
|
public void readMessages(MessageFilterList filterList, FutureCallback<List<BulletinBoardMessage>> callback) {
|
||||||
|
|
||||||
// Create job with no retries
|
// Create job with no retries
|
||||||
SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterList, 1);
|
SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterList, 1);
|
||||||
|
@ -327,19 +529,65 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readBatch(BatchSpecificationMessage batchSpecificationMessage, ClientCallback<CompleteBatch> callback) {
|
public void readBatch(BatchSpecificationMessage batchSpecificationMessage, FutureCallback<CompleteBatch> callback) {
|
||||||
|
|
||||||
// Create job with no retries
|
// Create job with no retries for retrieval of the Bulletin Board Message that defines the batch
|
||||||
SingleServerReadBatchWorker worker = new SingleServerReadBatchWorker(meerkatDBs.get(0), batchSpecificationMessage, 1);
|
|
||||||
|
|
||||||
// Submit job and create callback
|
MessageFilterList filterList = MessageFilterList.newBuilder()
|
||||||
scheduleWorker(worker, new RetryCallback(worker, callback));
|
.addFilter(MessageFilter.newBuilder()
|
||||||
|
.setType(FilterType.TAG)
|
||||||
|
.setTag(BulletinBoardConstants.BATCH_TAG)
|
||||||
|
.build())
|
||||||
|
.addFilter(MessageFilter.newBuilder()
|
||||||
|
.setType(FilterType.TAG)
|
||||||
|
.setTag(BulletinBoardConstants.BATCH_ID_TAG_PREFIX + batchSpecificationMessage.getBatchId())
|
||||||
|
.build())
|
||||||
|
.addFilter(MessageFilter.newBuilder()
|
||||||
|
.setType(FilterType.SIGNER_ID)
|
||||||
|
.setId(batchSpecificationMessage.getSignerId())
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
SingleServerReadMessagesWorker messageWorker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterList, 1);
|
||||||
|
|
||||||
|
// Create job with no retries for retrieval of the Batch Data List
|
||||||
|
SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(meerkatDBs.get(0), batchSpecificationMessage, 1);
|
||||||
|
|
||||||
|
// Create callback that will combine the two worker products
|
||||||
|
CompleteBatchReadCallback completeBatchReadCallback = new CompleteBatchReadCallback(callback);
|
||||||
|
|
||||||
|
// Submit jobs with wrapped callbacks
|
||||||
|
scheduleWorker(messageWorker, new RetryCallback(messageWorker, completeBatchReadCallback.asBulletinBoardMessageListFutureCallback()));
|
||||||
|
scheduleWorker(batchWorker, new RetryCallback(batchWorker, completeBatchReadCallback.asBatchDataListFutureCallback()));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void subscribe(MessageFilterList filterList, MessageHandler messageHandler) {
|
public void subscribe(MessageFilterList filterList, MessageHandler messageHandler) {
|
||||||
|
|
||||||
|
// Remove all existing MIN_ENTRY filters and create new one that starts at 0
|
||||||
|
|
||||||
|
MessageFilterList.Builder filterListBuilder = filterList.toBuilder();
|
||||||
|
|
||||||
|
Iterator<MessageFilter> iterator = filterListBuilder.getFilterList().iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
MessageFilter filter = iterator.next();
|
||||||
|
|
||||||
|
if (filter.getType() == FilterType.MIN_ENTRY){
|
||||||
|
iterator.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
filterListBuilder.addFilter(MessageFilter.newBuilder()
|
||||||
|
.setType(FilterType.MIN_ENTRY)
|
||||||
|
.setEntry(0)
|
||||||
|
.build());
|
||||||
|
|
||||||
|
// Create job with no retries
|
||||||
|
SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterListBuilder.build(), 1);
|
||||||
|
|
||||||
|
// Submit job and create callback
|
||||||
|
scheduleWorker(worker, new SubscriptionCallback(worker, messageHandler));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package meerkat.bulletinboard;
|
package meerkat.bulletinboard;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
|
|
||||||
import meerkat.bulletinboard.workers.multiserver.*;
|
import meerkat.bulletinboard.workers.multiserver.*;
|
||||||
|
@ -37,6 +38,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
|
||||||
|
|
||||||
private static final int SERVER_THREADPOOL_SIZE = 5;
|
private static final int SERVER_THREADPOOL_SIZE = 5;
|
||||||
private static final long FAIL_DELAY = 5000;
|
private static final long FAIL_DELAY = 5000;
|
||||||
|
private static final long SUBSCRIPTION_INTERVAL = 10000;
|
||||||
|
|
||||||
private int minAbsoluteRedundancy;
|
private int minAbsoluteRedundancy;
|
||||||
|
|
||||||
|
@ -59,11 +61,16 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
|
||||||
|
|
||||||
clients = new ArrayList<SingleServerBulletinBoardClient>(clientParams.getBulletinBoardAddressCount());
|
clients = new ArrayList<SingleServerBulletinBoardClient>(clientParams.getBulletinBoardAddressCount());
|
||||||
for (String address : clientParams.getBulletinBoardAddressList()){
|
for (String address : clientParams.getBulletinBoardAddressList()){
|
||||||
SingleServerBulletinBoardClient client = new SingleServerBulletinBoardClient(SERVER_THREADPOOL_SIZE, FAIL_DELAY);
|
|
||||||
|
SingleServerBulletinBoardClient client =
|
||||||
|
new SingleServerBulletinBoardClient(SERVER_THREADPOOL_SIZE, FAIL_DELAY, SUBSCRIPTION_INTERVAL);
|
||||||
|
|
||||||
client.init(BulletinBoardClientParams.newBuilder()
|
client.init(BulletinBoardClientParams.newBuilder()
|
||||||
.addBulletinBoardAddress(address)
|
.addBulletinBoardAddress(address)
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
clients.add(client);
|
clients.add(client);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -76,7 +83,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
|
||||||
* @throws CommunicationException
|
* @throws CommunicationException
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public MessageID postMessage(BulletinBoardMessage msg, ClientCallback<Boolean> callback){
|
public MessageID postMessage(BulletinBoardMessage msg, FutureCallback<Boolean> callback){
|
||||||
|
|
||||||
// Create job
|
// Create job
|
||||||
MultiServerPostMessageWorker worker =
|
MultiServerPostMessageWorker worker =
|
||||||
|
@ -93,7 +100,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MessageID postBatch(CompleteBatch completeBatch, ClientCallback<Boolean> callback) {
|
public MessageID postBatch(CompleteBatch completeBatch, FutureCallback<Boolean> callback) {
|
||||||
|
|
||||||
// Create job
|
// Create job
|
||||||
MultiServerPostBatchWorker worker =
|
MultiServerPostBatchWorker worker =
|
||||||
|
@ -110,7 +117,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void beginBatch(BeginBatchMessage beginBatchMessage, ClientCallback<Boolean> callback) {
|
public void beginBatch(BeginBatchMessage beginBatchMessage, FutureCallback<Boolean> callback) {
|
||||||
|
|
||||||
// Create job
|
// Create job
|
||||||
MultiServerBeginBatchWorker worker =
|
MultiServerBeginBatchWorker worker =
|
||||||
|
@ -123,7 +130,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList,
|
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList,
|
||||||
int startPosition, ClientCallback<Boolean> callback) {
|
int startPosition, FutureCallback<Boolean> callback) {
|
||||||
|
|
||||||
BatchDataContainer batchDataContainer = new BatchDataContainer(signerId, batchId, batchDataList, startPosition);
|
BatchDataContainer batchDataContainer = new BatchDataContainer(signerId, batchId, batchDataList, startPosition);
|
||||||
|
|
||||||
|
@ -137,7 +144,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, ClientCallback<Boolean> callback) {
|
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, FutureCallback<Boolean> callback) {
|
||||||
|
|
||||||
postBatchData(signerId, batchId, batchDataList, 0, callback);
|
postBatchData(signerId, batchId, batchDataList, 0, callback);
|
||||||
|
|
||||||
|
@ -145,21 +152,21 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList,
|
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList,
|
||||||
int startPosition, ClientCallback<Boolean> callback) {
|
int startPosition, FutureCallback<Boolean> callback) {
|
||||||
|
|
||||||
postBatchData(signerId.toByteArray(), batchId, batchDataList, startPosition, callback);
|
postBatchData(signerId.toByteArray(), batchId, batchDataList, startPosition, callback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList, ClientCallback<Boolean> callback) {
|
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList, FutureCallback<Boolean> callback) {
|
||||||
|
|
||||||
postBatchData(signerId, batchId, batchDataList, 0, callback);
|
postBatchData(signerId, batchId, batchDataList, 0, callback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback<Boolean> callback) {
|
public void closeBatch(CloseBatchMessage closeBatchMessage, FutureCallback<Boolean> callback) {
|
||||||
|
|
||||||
// Create job
|
// Create job
|
||||||
MultiServerCloseBatchWorker worker =
|
MultiServerCloseBatchWorker worker =
|
||||||
|
@ -177,7 +184,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
|
||||||
* Ignore communication exceptions in specific databases
|
* Ignore communication exceptions in specific databases
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void getRedundancy(MessageID id, ClientCallback<Float> callback) {
|
public void getRedundancy(MessageID id, FutureCallback<Float> callback) {
|
||||||
|
|
||||||
// Create job
|
// Create job
|
||||||
MultiServerGetRedundancyWorker worker =
|
MultiServerGetRedundancyWorker worker =
|
||||||
|
@ -194,7 +201,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
|
||||||
* If no operation is successful: return null (NOT blank list)
|
* If no operation is successful: return null (NOT blank list)
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void readMessages(MessageFilterList filterList, ClientCallback<List<BulletinBoardMessage>> callback) {
|
public void readMessages(MessageFilterList filterList, FutureCallback<List<BulletinBoardMessage>> callback) {
|
||||||
|
|
||||||
// Create job
|
// Create job
|
||||||
MultiServerReadMessagesWorker worker =
|
MultiServerReadMessagesWorker worker =
|
||||||
|
@ -206,7 +213,7 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readBatch(BatchSpecificationMessage batchSpecificationMessage, ClientCallback<CompleteBatch> callback) {
|
public void readBatch(BatchSpecificationMessage batchSpecificationMessage, FutureCallback<CompleteBatch> callback) {
|
||||||
|
|
||||||
// Create job
|
// Create job
|
||||||
MultiServerReadBatchWorker worker =
|
MultiServerReadBatchWorker worker =
|
||||||
|
@ -227,6 +234,11 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
|
||||||
super.close();
|
super.close();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
for (SingleServerBulletinBoardClient client : clients){
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
|
||||||
executorService.shutdown();
|
executorService.shutdown();
|
||||||
while (! executorService.isShutdown()) {
|
while (! executorService.isShutdown()) {
|
||||||
executorService.awaitTermination(10, TimeUnit.SECONDS);
|
executorService.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package meerkat.bulletinboard.workers.multiserver;
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.BeginBatchMessage;
|
import meerkat.protobuf.BulletinBoardAPI.BeginBatchMessage;
|
||||||
|
|
||||||
|
@ -13,9 +13,9 @@ public class MultiServerBeginBatchWorker extends MultiServerGenericPostWorker<Be
|
||||||
|
|
||||||
public MultiServerBeginBatchWorker(List<SingleServerBulletinBoardClient> clients,
|
public MultiServerBeginBatchWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
int minServers, BeginBatchMessage payload, int maxRetry,
|
int minServers, BeginBatchMessage payload, int maxRetry,
|
||||||
ClientCallback<Boolean> clientCallback) {
|
FutureCallback<Boolean> futureCallback) {
|
||||||
|
|
||||||
super(clients, minServers, payload, maxRetry, clientCallback);
|
super(clients, minServers, payload, maxRetry, futureCallback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package meerkat.bulletinboard.workers.multiserver;
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.CloseBatchMessage;
|
import meerkat.protobuf.BulletinBoardAPI.CloseBatchMessage;
|
||||||
|
|
||||||
|
@ -13,9 +13,9 @@ public class MultiServerCloseBatchWorker extends MultiServerGenericPostWorker<Cl
|
||||||
|
|
||||||
public MultiServerCloseBatchWorker(List<SingleServerBulletinBoardClient> clients,
|
public MultiServerCloseBatchWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
int minServers, CloseBatchMessage payload, int maxRetry,
|
int minServers, CloseBatchMessage payload, int maxRetry,
|
||||||
ClientCallback<Boolean> clientCallback) {
|
FutureCallback<Boolean> futureCallback) {
|
||||||
|
|
||||||
super(clients, minServers, payload, maxRetry, clientCallback);
|
super(clients, minServers, payload, maxRetry, futureCallback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package meerkat.bulletinboard.workers.multiserver;
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import meerkat.bulletinboard.MultiServerWorker;
|
import meerkat.bulletinboard.MultiServerWorker;
|
||||||
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
import meerkat.comm.CommunicationException;
|
import meerkat.comm.CommunicationException;
|
||||||
|
@ -17,9 +18,9 @@ public abstract class MultiServerGenericPostWorker<T> extends MultiServerWorker<
|
||||||
|
|
||||||
public MultiServerGenericPostWorker(List<SingleServerBulletinBoardClient> clients,
|
public MultiServerGenericPostWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
int minServers, T payload, int maxRetry,
|
int minServers, T payload, int maxRetry,
|
||||||
ClientCallback<Boolean> clientCallback) {
|
FutureCallback<Boolean> futureCallback) {
|
||||||
|
|
||||||
super(clients, minServers, payload, maxRetry, clientCallback);
|
super(clients, minServers, payload, maxRetry, futureCallback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,11 +36,6 @@ public abstract class MultiServerGenericPostWorker<T> extends MultiServerWorker<
|
||||||
*/
|
*/
|
||||||
public void run() {
|
public void run() {
|
||||||
|
|
||||||
WebTarget webTarget;
|
|
||||||
Response response;
|
|
||||||
|
|
||||||
int count = 0; // Used to count number of servers which contain the required message in a GET_REDUNDANCY request.
|
|
||||||
|
|
||||||
// Iterate through servers
|
// Iterate through servers
|
||||||
|
|
||||||
Iterator<SingleServerBulletinBoardClient> clientIterator = getClientIterator();
|
Iterator<SingleServerBulletinBoardClient> clientIterator = getClientIterator();
|
||||||
|
@ -56,7 +52,7 @@ public abstract class MultiServerGenericPostWorker<T> extends MultiServerWorker<
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleCallback(Boolean result) {
|
public void onSuccess(Boolean result) {
|
||||||
if (result){
|
if (result){
|
||||||
if (minServers.decrementAndGet() <= 0){
|
if (minServers.decrementAndGet() <= 0){
|
||||||
succeed(Boolean.TRUE);
|
succeed(Boolean.TRUE);
|
||||||
|
@ -65,7 +61,7 @@ public abstract class MultiServerGenericPostWorker<T> extends MultiServerWorker<
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
if (maxFailedServers.decrementAndGet() < 0){
|
if (maxFailedServers.decrementAndGet() < 0){
|
||||||
fail(t);
|
fail(t);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package meerkat.bulletinboard.workers.multiserver;
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import meerkat.bulletinboard.MultiServerWorker;
|
import meerkat.bulletinboard.MultiServerWorker;
|
||||||
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
import meerkat.comm.CommunicationException;
|
import meerkat.comm.CommunicationException;
|
||||||
|
@ -18,9 +18,9 @@ public abstract class MultiServerGenericReadWorker<IN, OUT> extends MultiServerW
|
||||||
|
|
||||||
public MultiServerGenericReadWorker(List<SingleServerBulletinBoardClient> clients,
|
public MultiServerGenericReadWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
int minServers, IN payload, int maxRetry,
|
int minServers, IN payload, int maxRetry,
|
||||||
ClientCallback<OUT> clientCallback) {
|
FutureCallback<OUT> futureCallback) {
|
||||||
|
|
||||||
super(clients, true, minServers, payload, maxRetry, clientCallback); // Shuffle clients on creation to balance load
|
super(clients, true, minServers, payload, maxRetry, futureCallback); // Shuffle clients on creation to balance load
|
||||||
|
|
||||||
clientIterator = getClientIterator();
|
clientIterator = getClientIterator();
|
||||||
|
|
||||||
|
@ -41,10 +41,10 @@ public abstract class MultiServerGenericReadWorker<IN, OUT> extends MultiServerW
|
||||||
|
|
||||||
if (clientIterator.hasNext()) {
|
if (clientIterator.hasNext()) {
|
||||||
|
|
||||||
// Send request to Server
|
// Get next server
|
||||||
SingleServerBulletinBoardClient client = clientIterator.next();
|
SingleServerBulletinBoardClient client = clientIterator.next();
|
||||||
|
|
||||||
// Retrieve answer
|
// Retrieve answer from server
|
||||||
doRead(payload, client);
|
doRead(payload, client);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
@ -54,12 +54,12 @@ public abstract class MultiServerGenericReadWorker<IN, OUT> extends MultiServerW
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleCallback(OUT msg) {
|
public void onSuccess(OUT msg) {
|
||||||
succeed(msg);
|
succeed(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
run(); // Retry with next server
|
run(); // Retry with next server
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package meerkat.bulletinboard.workers.multiserver;
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import meerkat.bulletinboard.MultiServerWorker;
|
import meerkat.bulletinboard.MultiServerWorker;
|
||||||
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
import meerkat.comm.CommunicationException;
|
import meerkat.comm.CommunicationException;
|
||||||
|
@ -20,9 +20,9 @@ public class MultiServerGetRedundancyWorker extends MultiServerWorker<MessageID,
|
||||||
|
|
||||||
public MultiServerGetRedundancyWorker(List<SingleServerBulletinBoardClient> clients,
|
public MultiServerGetRedundancyWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
int minServers, MessageID payload, int maxRetry,
|
int minServers, MessageID payload, int maxRetry,
|
||||||
ClientCallback<Float> clientCallback) {
|
FutureCallback<Float> futureCallback) {
|
||||||
|
|
||||||
super(clients, minServers, payload, maxRetry, clientCallback); // Shuffle clients on creation to balance load
|
super(clients, minServers, payload, maxRetry, futureCallback); // Shuffle clients on creation to balance load
|
||||||
|
|
||||||
serversContainingMessage = new AtomicInteger(0);
|
serversContainingMessage = new AtomicInteger(0);
|
||||||
totalContactedServers = new AtomicInteger(0);
|
totalContactedServers = new AtomicInteger(0);
|
||||||
|
@ -54,7 +54,7 @@ public class MultiServerGetRedundancyWorker extends MultiServerWorker<MessageID,
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleCallback(Float result) {
|
public void onSuccess(Float result) {
|
||||||
|
|
||||||
if (result > 0.5) {
|
if (result > 0.5) {
|
||||||
serversContainingMessage.incrementAndGet();
|
serversContainingMessage.incrementAndGet();
|
||||||
|
@ -67,8 +67,8 @@ public class MultiServerGetRedundancyWorker extends MultiServerWorker<MessageID,
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
handleCallback(new Float(0.0));
|
onSuccess(new Float(0.0));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package meerkat.bulletinboard.workers.multiserver;
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
import meerkat.bulletinboard.BatchDataContainer;
|
import meerkat.bulletinboard.BatchDataContainer;
|
||||||
|
|
||||||
|
@ -13,9 +13,9 @@ public class MultiServerPostBatchDataWorker extends MultiServerGenericPostWorker
|
||||||
|
|
||||||
public MultiServerPostBatchDataWorker(List<SingleServerBulletinBoardClient> clients,
|
public MultiServerPostBatchDataWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
int minServers, BatchDataContainer payload, int maxRetry,
|
int minServers, BatchDataContainer payload, int maxRetry,
|
||||||
ClientCallback<Boolean> clientCallback) {
|
FutureCallback<Boolean> futureCallback) {
|
||||||
|
|
||||||
super(clients, minServers, payload, maxRetry, clientCallback);
|
super(clients, minServers, payload, maxRetry, futureCallback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package meerkat.bulletinboard.workers.multiserver;
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import meerkat.bulletinboard.CompleteBatch;
|
import meerkat.bulletinboard.CompleteBatch;
|
||||||
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
|
|
||||||
|
@ -13,9 +13,9 @@ public class MultiServerPostBatchWorker extends MultiServerGenericPostWorker<Com
|
||||||
|
|
||||||
public MultiServerPostBatchWorker(List<SingleServerBulletinBoardClient> clients,
|
public MultiServerPostBatchWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
int minServers, CompleteBatch payload, int maxRetry,
|
int minServers, CompleteBatch payload, int maxRetry,
|
||||||
ClientCallback<Boolean> clientCallback) {
|
FutureCallback<Boolean> futureCallback) {
|
||||||
|
|
||||||
super(clients, minServers, payload, maxRetry, clientCallback);
|
super(clients, minServers, payload, maxRetry, futureCallback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package meerkat.bulletinboard.workers.multiserver;
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
|
|
||||||
|
@ -13,9 +13,9 @@ public class MultiServerPostMessageWorker extends MultiServerGenericPostWorker<B
|
||||||
|
|
||||||
public MultiServerPostMessageWorker(List<SingleServerBulletinBoardClient> clients,
|
public MultiServerPostMessageWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
int minServers, BulletinBoardMessage payload, int maxRetry,
|
int minServers, BulletinBoardMessage payload, int maxRetry,
|
||||||
ClientCallback<Boolean> clientCallback) {
|
FutureCallback<Boolean> futureCallback) {
|
||||||
|
|
||||||
super(clients, minServers, payload, maxRetry, clientCallback);
|
super(clients, minServers, payload, maxRetry, futureCallback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package meerkat.bulletinboard.workers.multiserver;
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import meerkat.bulletinboard.CompleteBatch;
|
import meerkat.bulletinboard.CompleteBatch;
|
||||||
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.BatchSpecificationMessage;
|
import meerkat.protobuf.BulletinBoardAPI.BatchSpecificationMessage;
|
||||||
|
@ -15,9 +15,9 @@ public class MultiServerReadBatchWorker extends MultiServerGenericReadWorker<Bat
|
||||||
|
|
||||||
public MultiServerReadBatchWorker(List<SingleServerBulletinBoardClient> clients,
|
public MultiServerReadBatchWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
int minServers, BatchSpecificationMessage payload, int maxRetry,
|
int minServers, BatchSpecificationMessage payload, int maxRetry,
|
||||||
ClientCallback<CompleteBatch> clientCallback) {
|
FutureCallback<CompleteBatch> futureCallback) {
|
||||||
|
|
||||||
super(clients, minServers, payload, maxRetry, clientCallback);
|
super(clients, minServers, payload, maxRetry, futureCallback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package meerkat.bulletinboard.workers.multiserver;
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
|
|
||||||
|
@ -14,9 +14,9 @@ public class MultiServerReadMessagesWorker extends MultiServerGenericReadWorker<
|
||||||
|
|
||||||
public MultiServerReadMessagesWorker(List<SingleServerBulletinBoardClient> clients,
|
public MultiServerReadMessagesWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
int minServers, MessageFilterList payload, int maxRetry,
|
int minServers, MessageFilterList payload, int maxRetry,
|
||||||
ClientCallback<List<BulletinBoardMessage>> clientCallback) {
|
FutureCallback<List<BulletinBoardMessage>> futureCallback) {
|
||||||
|
|
||||||
super(clients, minServers, payload, maxRetry, clientCallback);
|
super(clients, minServers, payload, maxRetry, futureCallback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ import static meerkat.bulletinboard.BulletinBoardConstants.BATCH_ID_TAG_PREFIX;
|
||||||
/**
|
/**
|
||||||
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
||||||
*/
|
*/
|
||||||
public class SingleServerReadBatchWorker extends SingleServerWorker<BatchSpecificationMessage, CompleteBatch> {
|
public class SingleServerReadBatchWorker extends SingleServerWorker<BatchSpecificationMessage, List<BatchData>> {
|
||||||
|
|
||||||
public SingleServerReadBatchWorker(String serverAddress, BatchSpecificationMessage payload, int maxRetry) {
|
public SingleServerReadBatchWorker(String serverAddress, BatchSpecificationMessage payload, int maxRetry) {
|
||||||
super(serverAddress, payload, maxRetry);
|
super(serverAddress, payload, maxRetry);
|
||||||
|
@ -35,59 +35,13 @@ public class SingleServerReadBatchWorker extends SingleServerWorker<BatchSpecifi
|
||||||
* @return the complete batch as read from the server
|
* @return the complete batch as read from the server
|
||||||
* @throws CommunicationException if the server's response is invalid
|
* @throws CommunicationException if the server's response is invalid
|
||||||
*/
|
*/
|
||||||
public CompleteBatch call() throws CommunicationException{
|
public List<BatchData> call() throws CommunicationException{
|
||||||
|
|
||||||
CompleteBatch completeBatch = new CompleteBatch();
|
|
||||||
|
|
||||||
Client client = clientLocal.get();
|
Client client = clientLocal.get();
|
||||||
|
|
||||||
WebTarget webTarget;
|
WebTarget webTarget;
|
||||||
Response response;
|
Response response;
|
||||||
|
|
||||||
// Set filters for the batch message metadata retrieval
|
|
||||||
|
|
||||||
MessageFilterList messageFilterList = MessageFilterList.newBuilder()
|
|
||||||
.addFilter(MessageFilter.newBuilder()
|
|
||||||
.setType(FilterType.TAG)
|
|
||||||
.setTag(BATCH_ID_TAG_PREFIX + String.valueOf(payload.getBatchId()))
|
|
||||||
.build())
|
|
||||||
.addFilter(MessageFilter.newBuilder()
|
|
||||||
.setType(FilterType.SIGNER_ID)
|
|
||||||
.setId(payload.getSignerId())
|
|
||||||
.build())
|
|
||||||
.build();
|
|
||||||
|
|
||||||
// Send request to Server
|
|
||||||
|
|
||||||
webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(READ_MESSAGES_PATH);
|
|
||||||
response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(
|
|
||||||
Entity.entity(messageFilterList, Constants.MEDIATYPE_PROTOBUF));
|
|
||||||
|
|
||||||
// Retrieve answer
|
|
||||||
|
|
||||||
try {
|
|
||||||
|
|
||||||
// If a BulletinBoardMessageList is returned: the read was successful
|
|
||||||
BulletinBoardMessage metadata = response.readEntity(BulletinBoardMessageList.class).getMessage(0);
|
|
||||||
|
|
||||||
completeBatch.setBeginBatchMessage(BeginBatchMessage.newBuilder()
|
|
||||||
.setSignerId(payload.getSignerId())
|
|
||||||
.setBatchId(payload.getBatchId())
|
|
||||||
.addAllTag(metadata.getMsg().getTagList())
|
|
||||||
.build());
|
|
||||||
|
|
||||||
completeBatch.setSignature(metadata.getSig(0));
|
|
||||||
|
|
||||||
} catch (ProcessingException | IllegalStateException e) {
|
|
||||||
|
|
||||||
// Read failed
|
|
||||||
throw new CommunicationException("Could not contact the server");
|
|
||||||
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
response.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get the batch data
|
// Get the batch data
|
||||||
|
|
||||||
webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(READ_BATCH_PATH);
|
webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(READ_BATCH_PATH);
|
||||||
|
@ -98,9 +52,8 @@ public class SingleServerReadBatchWorker extends SingleServerWorker<BatchSpecifi
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
// If a List of BatchData is returned: the read was successful
|
// If a BatchDataList is returned: the read was successful
|
||||||
|
return response.readEntity(BatchDataList.class).getDataList();
|
||||||
completeBatch.appendBatchData(response.readEntity(new GenericType<List<BatchData>>(){}));
|
|
||||||
|
|
||||||
} catch (ProcessingException | IllegalStateException e) {
|
} catch (ProcessingException | IllegalStateException e) {
|
||||||
|
|
||||||
|
@ -112,8 +65,6 @@ public class SingleServerReadBatchWorker extends SingleServerWorker<BatchSpecifi
|
||||||
response.close();
|
response.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
return completeBatch;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,214 +0,0 @@
|
||||||
import com.google.protobuf.ByteString;
|
|
||||||
import meerkat.bulletinboard.AsyncBulletinBoardClient;
|
|
||||||
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
|
||||||
import meerkat.bulletinboard.ThreadedBulletinBoardClient;
|
|
||||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
|
||||||
import meerkat.protobuf.Crypto;
|
|
||||||
|
|
||||||
import meerkat.protobuf.Voting.*;
|
|
||||||
import meerkat.util.BulletinBoardMessageComparator;
|
|
||||||
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
|
||||||
import static org.hamcrest.CoreMatchers.*;
|
|
||||||
import static org.hamcrest.number.OrderingComparison.*;
|
|
||||||
|
|
||||||
import java.util.*;
|
|
||||||
import java.util.concurrent.Semaphore;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Created by Arbel Deutsch Peled on 05-Dec-15.
|
|
||||||
*/
|
|
||||||
public class BulletinBoardClientIntegrationTest {
|
|
||||||
|
|
||||||
Semaphore jobSemaphore;
|
|
||||||
Vector<Throwable> thrown;
|
|
||||||
|
|
||||||
protected void genericHandleFailure(Throwable t){
|
|
||||||
System.err.println(t.getCause() + " " + t.getMessage());
|
|
||||||
thrown.add(t);
|
|
||||||
jobSemaphore.release();
|
|
||||||
}
|
|
||||||
|
|
||||||
private class PostCallback implements ClientCallback<Boolean>{
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void handleCallback(Boolean msg) {
|
|
||||||
System.err.println("Post operation completed");
|
|
||||||
jobSemaphore.release();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void handleFailure(Throwable t) {
|
|
||||||
genericHandleFailure(t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class RedundancyCallback implements ClientCallback<Float>{
|
|
||||||
|
|
||||||
private float minRedundancy;
|
|
||||||
|
|
||||||
public RedundancyCallback(float minRedundancy) {
|
|
||||||
this.minRedundancy = minRedundancy;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void handleCallback(Float redundancy) {
|
|
||||||
System.err.println("Redundancy found is: " + redundancy);
|
|
||||||
jobSemaphore.release();
|
|
||||||
assertThat(redundancy, greaterThanOrEqualTo(minRedundancy));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void handleFailure(Throwable t) {
|
|
||||||
genericHandleFailure(t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class ReadCallback implements ClientCallback<List<BulletinBoardMessage>>{
|
|
||||||
|
|
||||||
private List<BulletinBoardMessage> expectedMsgList;
|
|
||||||
|
|
||||||
public ReadCallback(List<BulletinBoardMessage> expectedMsgList) {
|
|
||||||
this.expectedMsgList = expectedMsgList;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void handleCallback(List<BulletinBoardMessage> messages) {
|
|
||||||
|
|
||||||
System.err.println(messages);
|
|
||||||
jobSemaphore.release();
|
|
||||||
|
|
||||||
BulletinBoardMessageComparator msgComparator = new BulletinBoardMessageComparator();
|
|
||||||
|
|
||||||
assertThat(messages.size(), is(expectedMsgList.size()));
|
|
||||||
|
|
||||||
Iterator<BulletinBoardMessage> expectedMessageIterator = expectedMsgList.iterator();
|
|
||||||
Iterator<BulletinBoardMessage> receivedMessageIterator = messages.iterator();
|
|
||||||
|
|
||||||
while (expectedMessageIterator.hasNext()) {
|
|
||||||
assertThat(msgComparator.compare(expectedMessageIterator.next(), receivedMessageIterator.next()), is(0));
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void handleFailure(Throwable t) {
|
|
||||||
genericHandleFailure(t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private AsyncBulletinBoardClient bulletinBoardClient;
|
|
||||||
|
|
||||||
private PostCallback postCallback;
|
|
||||||
private RedundancyCallback redundancyCallback;
|
|
||||||
private ReadCallback readCallback;
|
|
||||||
|
|
||||||
private static String PROP_GETTY_URL = "gretty.httpBaseURI";
|
|
||||||
private static String DEFAULT_BASE_URL = "http://localhost:8081";
|
|
||||||
private static String BASE_URL = System.getProperty(PROP_GETTY_URL, DEFAULT_BASE_URL);
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void init(){
|
|
||||||
|
|
||||||
bulletinBoardClient = new ThreadedBulletinBoardClient();
|
|
||||||
|
|
||||||
List<String> testDB = new LinkedList<String>();
|
|
||||||
testDB.add(BASE_URL);
|
|
||||||
|
|
||||||
bulletinBoardClient.init(BulletinBoardClientParams.newBuilder()
|
|
||||||
.addBulletinBoardAddress("http://localhost:8081")
|
|
||||||
.setMinRedundancy((float) 1.0)
|
|
||||||
.build());
|
|
||||||
|
|
||||||
postCallback = new PostCallback();
|
|
||||||
redundancyCallback = new RedundancyCallback((float) 1.0);
|
|
||||||
|
|
||||||
thrown = new Vector<>();
|
|
||||||
jobSemaphore = new Semaphore(0);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void postTest() {
|
|
||||||
|
|
||||||
byte[] b1 = {(byte) 1, (byte) 2, (byte) 3, (byte) 4};
|
|
||||||
byte[] b2 = {(byte) 11, (byte) 12, (byte) 13, (byte) 14};
|
|
||||||
byte[] b3 = {(byte) 21, (byte) 22, (byte) 23, (byte) 24};
|
|
||||||
byte[] b4 = {(byte) 4, (byte) 5, (byte) 100, (byte) -50, (byte) 0};
|
|
||||||
|
|
||||||
BulletinBoardMessage msg;
|
|
||||||
|
|
||||||
MessageFilterList filterList;
|
|
||||||
List<BulletinBoardMessage> msgList;
|
|
||||||
|
|
||||||
MessageID messageID;
|
|
||||||
|
|
||||||
Comparator<BulletinBoardMessage> msgComparator = new BulletinBoardMessageComparator();
|
|
||||||
|
|
||||||
msg = BulletinBoardMessage.newBuilder()
|
|
||||||
.setMsg(UnsignedBulletinBoardMessage.newBuilder()
|
|
||||||
.addTag("Signature")
|
|
||||||
.addTag("Trustee")
|
|
||||||
.setData(ByteString.copyFrom(b1))
|
|
||||||
.build())
|
|
||||||
.addSig(Crypto.Signature.newBuilder()
|
|
||||||
.setType(Crypto.SignatureType.DSA)
|
|
||||||
.setData(ByteString.copyFrom(b2))
|
|
||||||
.setSignerId(ByteString.copyFrom(b3))
|
|
||||||
.build())
|
|
||||||
.addSig(Crypto.Signature.newBuilder()
|
|
||||||
.setType(Crypto.SignatureType.ECDSA)
|
|
||||||
.setData(ByteString.copyFrom(b3))
|
|
||||||
.setSignerId(ByteString.copyFrom(b2))
|
|
||||||
.build())
|
|
||||||
.build();
|
|
||||||
|
|
||||||
messageID = bulletinBoardClient.postMessage(msg,postCallback);
|
|
||||||
|
|
||||||
try {
|
|
||||||
jobSemaphore.acquire();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
System.err.println(e.getCause() + " " + e.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
bulletinBoardClient.getRedundancy(messageID,redundancyCallback);
|
|
||||||
|
|
||||||
filterList = MessageFilterList.newBuilder()
|
|
||||||
.addFilter(
|
|
||||||
MessageFilter.newBuilder()
|
|
||||||
.setType(FilterType.TAG)
|
|
||||||
.setTag("Signature")
|
|
||||||
.build()
|
|
||||||
)
|
|
||||||
.addFilter(
|
|
||||||
MessageFilter.newBuilder()
|
|
||||||
.setType(FilterType.TAG)
|
|
||||||
.setTag("Trustee")
|
|
||||||
.build()
|
|
||||||
)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
msgList = new LinkedList<BulletinBoardMessage>();
|
|
||||||
msgList.add(msg);
|
|
||||||
|
|
||||||
readCallback = new ReadCallback(msgList);
|
|
||||||
|
|
||||||
bulletinBoardClient.readMessages(filterList, readCallback);
|
|
||||||
try {
|
|
||||||
jobSemaphore.acquire(2);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
System.err.println(e.getCause() + " " + e.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
bulletinBoardClient.close();
|
|
||||||
|
|
||||||
if (thrown.size() > 0) {
|
|
||||||
assert false;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -0,0 +1,541 @@
|
||||||
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
import meerkat.bulletinboard.AsyncBulletinBoardClient;
|
||||||
|
import meerkat.bulletinboard.CompleteBatch;
|
||||||
|
import meerkat.bulletinboard.GenericBatchDigitalSignature;
|
||||||
|
import meerkat.bulletinboard.ThreadedBulletinBoardClient;
|
||||||
|
import meerkat.comm.CommunicationException;
|
||||||
|
import meerkat.crypto.concrete.ECDSASignature;
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
|
import meerkat.protobuf.Crypto;
|
||||||
|
|
||||||
|
import meerkat.protobuf.Voting.*;
|
||||||
|
import meerkat.util.BulletinBoardMessageComparator;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
import static org.hamcrest.CoreMatchers.*;
|
||||||
|
import static org.hamcrest.number.OrderingComparison.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.security.*;
|
||||||
|
import java.security.cert.CertificateException;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by Arbel Deutsch Peled on 05-Dec-15.
|
||||||
|
*/
|
||||||
|
public class ThreadedBulletinBoardClientIntegrationTest {
|
||||||
|
|
||||||
|
// Signature resources
|
||||||
|
|
||||||
|
private GenericBatchDigitalSignature signers[];
|
||||||
|
private ByteString[] signerIDs;
|
||||||
|
|
||||||
|
private static String KEYFILE_EXAMPLE = "/certs/enduser-certs/user1-key-with-password-secret.p12";
|
||||||
|
private static String KEYFILE_EXAMPLE3 = "/certs/enduser-certs/user3-key-with-password-shh.p12";
|
||||||
|
|
||||||
|
private static String KEYFILE_PASSWORD1 = "secret";
|
||||||
|
private static String KEYFILE_PASSWORD3 = "shh";
|
||||||
|
|
||||||
|
public static String CERT1_PEM_EXAMPLE = "/certs/enduser-certs/user1.crt";
|
||||||
|
public static String CERT3_PEM_EXAMPLE = "/certs/enduser-certs/user3.crt";
|
||||||
|
|
||||||
|
// Server data
|
||||||
|
|
||||||
|
private static String PROP_GETTY_URL = "gretty.httpBaseURI";
|
||||||
|
private static String DEFAULT_BASE_URL = "http://localhost:8081";
|
||||||
|
private static String BASE_URL = System.getProperty(PROP_GETTY_URL, DEFAULT_BASE_URL);
|
||||||
|
|
||||||
|
// Client and callbacks
|
||||||
|
|
||||||
|
private AsyncBulletinBoardClient bulletinBoardClient;
|
||||||
|
|
||||||
|
private PostCallback postCallback;
|
||||||
|
private PostCallback failPostCallback = new PostCallback(true,false);
|
||||||
|
|
||||||
|
private RedundancyCallback redundancyCallback;
|
||||||
|
private ReadCallback readCallback;
|
||||||
|
private ReadBatchCallback readBatchCallback;
|
||||||
|
|
||||||
|
// Sync and misc
|
||||||
|
|
||||||
|
private Semaphore jobSemaphore;
|
||||||
|
private Vector<Throwable> thrown;
|
||||||
|
private Random random;
|
||||||
|
|
||||||
|
// Constructor
|
||||||
|
|
||||||
|
public ThreadedBulletinBoardClientIntegrationTest(){
|
||||||
|
|
||||||
|
signers = new GenericBatchDigitalSignature[2];
|
||||||
|
signerIDs = new ByteString[signers.length];
|
||||||
|
signers[0] = new GenericBatchDigitalSignature(new ECDSASignature());
|
||||||
|
signers[1] = new GenericBatchDigitalSignature(new ECDSASignature());
|
||||||
|
|
||||||
|
InputStream keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE);
|
||||||
|
char[] password = KEYFILE_PASSWORD1.toCharArray();
|
||||||
|
|
||||||
|
KeyStore.Builder keyStoreBuilder = null;
|
||||||
|
try {
|
||||||
|
keyStoreBuilder = signers[0].getPKCS12KeyStoreBuilder(keyStream, password);
|
||||||
|
|
||||||
|
signers[0].loadSigningCertificate(keyStoreBuilder);
|
||||||
|
|
||||||
|
signers[0].loadVerificationCertificates(getClass().getResourceAsStream(CERT1_PEM_EXAMPLE));
|
||||||
|
|
||||||
|
keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE3);
|
||||||
|
password = KEYFILE_PASSWORD3.toCharArray();
|
||||||
|
|
||||||
|
keyStoreBuilder = signers[1].getPKCS12KeyStoreBuilder(keyStream, password);
|
||||||
|
signers[1].loadSigningCertificate(keyStoreBuilder);
|
||||||
|
|
||||||
|
signers[1].loadVerificationCertificates(getClass().getResourceAsStream(CERT3_PEM_EXAMPLE));
|
||||||
|
|
||||||
|
for (int i = 0 ; i < signers.length ; i++) {
|
||||||
|
signerIDs[i] = signers[i].getSignerID();
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (IOException e) {
|
||||||
|
System.err.println("Failed reading from signature file " + e.getMessage());
|
||||||
|
fail("Failed reading from signature file " + e.getMessage());
|
||||||
|
} catch (CertificateException e) {
|
||||||
|
System.err.println("Failed reading certificate " + e.getMessage());
|
||||||
|
fail("Failed reading certificate " + e.getMessage());
|
||||||
|
} catch (KeyStoreException e) {
|
||||||
|
System.err.println("Failed reading keystore " + e.getMessage());
|
||||||
|
fail("Failed reading keystore " + e.getMessage());
|
||||||
|
} catch (NoSuchAlgorithmException e) {
|
||||||
|
System.err.println("Couldn't find signing algorithm " + e.getMessage());
|
||||||
|
fail("Couldn't find signing algorithm " + e.getMessage());
|
||||||
|
} catch (UnrecoverableKeyException e) {
|
||||||
|
System.err.println("Couldn't find signing key " + e.getMessage());
|
||||||
|
fail("Couldn't find signing key " + e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Callback definitions
|
||||||
|
|
||||||
|
protected void genericHandleFailure(Throwable t){
|
||||||
|
System.err.println(t.getCause() + " " + t.getMessage());
|
||||||
|
thrown.add(t);
|
||||||
|
jobSemaphore.release();
|
||||||
|
}
|
||||||
|
|
||||||
|
private class PostCallback implements FutureCallback<Boolean>{
|
||||||
|
|
||||||
|
private boolean isAssert;
|
||||||
|
private boolean assertValue;
|
||||||
|
|
||||||
|
public PostCallback() {
|
||||||
|
this(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public PostCallback(boolean isAssert) {
|
||||||
|
this(isAssert,true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public PostCallback(boolean isAssert, boolean assertValue) {
|
||||||
|
this.isAssert = isAssert;
|
||||||
|
this.assertValue = assertValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Boolean msg) {
|
||||||
|
System.err.println("Post operation completed");
|
||||||
|
jobSemaphore.release();
|
||||||
|
//TODO: Change Assert mechanism to exception one
|
||||||
|
if (isAssert) {
|
||||||
|
if (assertValue) {
|
||||||
|
assertThat("Post operation failed", msg, is(Boolean.TRUE));
|
||||||
|
} else {
|
||||||
|
assertThat("Post operation succeeded unexpectedly", msg, is(Boolean.FALSE));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
genericHandleFailure(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class RedundancyCallback implements FutureCallback<Float>{
|
||||||
|
|
||||||
|
private float minRedundancy;
|
||||||
|
|
||||||
|
public RedundancyCallback(float minRedundancy) {
|
||||||
|
this.minRedundancy = minRedundancy;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Float redundancy) {
|
||||||
|
System.err.println("Redundancy found is: " + redundancy);
|
||||||
|
jobSemaphore.release();
|
||||||
|
assertThat(redundancy, greaterThanOrEqualTo(minRedundancy));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
genericHandleFailure(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ReadCallback implements FutureCallback<List<BulletinBoardMessage>>{
|
||||||
|
|
||||||
|
private List<BulletinBoardMessage> expectedMsgList;
|
||||||
|
|
||||||
|
public ReadCallback(List<BulletinBoardMessage> expectedMsgList) {
|
||||||
|
this.expectedMsgList = expectedMsgList;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(List<BulletinBoardMessage> messages) {
|
||||||
|
|
||||||
|
System.err.println(messages);
|
||||||
|
jobSemaphore.release();
|
||||||
|
|
||||||
|
BulletinBoardMessageComparator msgComparator = new BulletinBoardMessageComparator();
|
||||||
|
|
||||||
|
assertThat(messages.size(), is(expectedMsgList.size()));
|
||||||
|
|
||||||
|
Iterator<BulletinBoardMessage> expectedMessageIterator = expectedMsgList.iterator();
|
||||||
|
Iterator<BulletinBoardMessage> receivedMessageIterator = messages.iterator();
|
||||||
|
|
||||||
|
while (expectedMessageIterator.hasNext()) {
|
||||||
|
assertThat(msgComparator.compare(expectedMessageIterator.next(), receivedMessageIterator.next()), is(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
genericHandleFailure(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ReadBatchCallback implements FutureCallback<CompleteBatch> {
|
||||||
|
|
||||||
|
private CompleteBatch expectedBatch;
|
||||||
|
|
||||||
|
public ReadBatchCallback(CompleteBatch expectedBatch) {
|
||||||
|
this.expectedBatch = expectedBatch;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(CompleteBatch batch) {
|
||||||
|
|
||||||
|
System.err.println(batch);
|
||||||
|
jobSemaphore.release();
|
||||||
|
|
||||||
|
assertThat("Batch returned is incorrect", batch, is(equalTo(expectedBatch)));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
genericHandleFailure(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Randomness generators
|
||||||
|
|
||||||
|
private byte randomByte(){
|
||||||
|
return (byte) random.nextInt();
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] randomByteArray(int length) {
|
||||||
|
|
||||||
|
byte[] randomBytes = new byte[length];
|
||||||
|
|
||||||
|
for (int i = 0; i < length ; i++){
|
||||||
|
randomBytes[i] = randomByte();
|
||||||
|
}
|
||||||
|
|
||||||
|
return randomBytes;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private CompleteBatch createRandomBatch(int signer, int batchId, int length) throws SignatureException {
|
||||||
|
|
||||||
|
CompleteBatch completeBatch = new CompleteBatch();
|
||||||
|
|
||||||
|
// Create data
|
||||||
|
|
||||||
|
completeBatch.setBeginBatchMessage(BeginBatchMessage.newBuilder()
|
||||||
|
.setSignerId(signerIDs[signer])
|
||||||
|
.setBatchId(batchId)
|
||||||
|
.addTag("Test")
|
||||||
|
.build());
|
||||||
|
|
||||||
|
for (int i = 0 ; i < length ; i++){
|
||||||
|
|
||||||
|
BatchData batchData = BatchData.newBuilder()
|
||||||
|
.setData(ByteString.copyFrom(randomByteArray(i)))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
completeBatch.appendBatchData(batchData);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
signers[signer].updateContent(completeBatch);
|
||||||
|
|
||||||
|
completeBatch.setSignature(signers[signer].sign());
|
||||||
|
|
||||||
|
return completeBatch;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test methods
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Takes care of initializing the client and the test resources
|
||||||
|
*/
|
||||||
|
@Before
|
||||||
|
public void init(){
|
||||||
|
|
||||||
|
bulletinBoardClient = new ThreadedBulletinBoardClient();
|
||||||
|
|
||||||
|
random = new Random(0); // We use insecure randomness in tests for repeatability
|
||||||
|
|
||||||
|
List<String> testDB = new LinkedList<String>();
|
||||||
|
testDB.add(BASE_URL);
|
||||||
|
|
||||||
|
bulletinBoardClient.init(BulletinBoardClientParams.newBuilder()
|
||||||
|
.addBulletinBoardAddress("http://localhost:8081")
|
||||||
|
.setMinRedundancy((float) 1.0)
|
||||||
|
.build());
|
||||||
|
|
||||||
|
postCallback = new PostCallback();
|
||||||
|
redundancyCallback = new RedundancyCallback((float) 1.0);
|
||||||
|
|
||||||
|
thrown = new Vector<>();
|
||||||
|
jobSemaphore = new Semaphore(0);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Closes the client and makes sure the test fails when an exception occurred in a separate thread
|
||||||
|
*/
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void close() {
|
||||||
|
|
||||||
|
bulletinBoardClient.close();
|
||||||
|
|
||||||
|
if (thrown.size() > 0) {
|
||||||
|
assert false;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the standard post, redundancy and read methods
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void postTest() {
|
||||||
|
|
||||||
|
byte[] b1 = {(byte) 1, (byte) 2, (byte) 3, (byte) 4};
|
||||||
|
byte[] b2 = {(byte) 11, (byte) 12, (byte) 13, (byte) 14};
|
||||||
|
byte[] b3 = {(byte) 21, (byte) 22, (byte) 23, (byte) 24};
|
||||||
|
byte[] b4 = {(byte) 4, (byte) 5, (byte) 100, (byte) -50, (byte) 0};
|
||||||
|
|
||||||
|
BulletinBoardMessage msg;
|
||||||
|
|
||||||
|
MessageFilterList filterList;
|
||||||
|
List<BulletinBoardMessage> msgList;
|
||||||
|
|
||||||
|
MessageID messageID;
|
||||||
|
|
||||||
|
Comparator<BulletinBoardMessage> msgComparator = new BulletinBoardMessageComparator();
|
||||||
|
|
||||||
|
msg = BulletinBoardMessage.newBuilder()
|
||||||
|
.setMsg(UnsignedBulletinBoardMessage.newBuilder()
|
||||||
|
.addTag("Signature")
|
||||||
|
.addTag("Trustee")
|
||||||
|
.setData(ByteString.copyFrom(b1))
|
||||||
|
.build())
|
||||||
|
.addSig(Crypto.Signature.newBuilder()
|
||||||
|
.setType(Crypto.SignatureType.DSA)
|
||||||
|
.setData(ByteString.copyFrom(b2))
|
||||||
|
.setSignerId(ByteString.copyFrom(b3))
|
||||||
|
.build())
|
||||||
|
.addSig(Crypto.Signature.newBuilder()
|
||||||
|
.setType(Crypto.SignatureType.ECDSA)
|
||||||
|
.setData(ByteString.copyFrom(b3))
|
||||||
|
.setSignerId(ByteString.copyFrom(b2))
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
messageID = bulletinBoardClient.postMessage(msg,postCallback);
|
||||||
|
|
||||||
|
try {
|
||||||
|
jobSemaphore.acquire();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
System.err.println(e.getCause() + " " + e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
bulletinBoardClient.getRedundancy(messageID,redundancyCallback);
|
||||||
|
|
||||||
|
filterList = MessageFilterList.newBuilder()
|
||||||
|
.addFilter(
|
||||||
|
MessageFilter.newBuilder()
|
||||||
|
.setType(FilterType.TAG)
|
||||||
|
.setTag("Signature")
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
.addFilter(
|
||||||
|
MessageFilter.newBuilder()
|
||||||
|
.setType(FilterType.TAG)
|
||||||
|
.setTag("Trustee")
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
msgList = new LinkedList<BulletinBoardMessage>();
|
||||||
|
msgList.add(msg);
|
||||||
|
|
||||||
|
readCallback = new ReadCallback(msgList);
|
||||||
|
|
||||||
|
bulletinBoardClient.readMessages(filterList, readCallback);
|
||||||
|
try {
|
||||||
|
jobSemaphore.acquire(2);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
System.err.println(e.getCause() + " " + e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests posting a batch by parts
|
||||||
|
* Also tests not being able to post to a closed batch
|
||||||
|
* @throws CommunicationException, SignatureException, InterruptedException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException {
|
||||||
|
|
||||||
|
final int SIGNER = 1;
|
||||||
|
final int BATCH_ID = 100;
|
||||||
|
final int BATCH_LENGTH = 100;
|
||||||
|
|
||||||
|
CompleteBatch completeBatch = createRandomBatch(SIGNER, BATCH_ID, BATCH_LENGTH);
|
||||||
|
|
||||||
|
// Begin batch
|
||||||
|
|
||||||
|
bulletinBoardClient.beginBatch(completeBatch.getBeginBatchMessage(), postCallback);
|
||||||
|
|
||||||
|
jobSemaphore.acquire();
|
||||||
|
|
||||||
|
// Post data
|
||||||
|
|
||||||
|
bulletinBoardClient.postBatchData(signerIDs[SIGNER], BATCH_ID, completeBatch.getBatchDataList(), postCallback);
|
||||||
|
|
||||||
|
jobSemaphore.acquire();
|
||||||
|
|
||||||
|
// Close batch
|
||||||
|
|
||||||
|
CloseBatchMessage closeBatchMessage = CloseBatchMessage.newBuilder()
|
||||||
|
.setBatchId(BATCH_ID)
|
||||||
|
.setBatchLength(BATCH_LENGTH)
|
||||||
|
.setSig(completeBatch.getSignature())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
bulletinBoardClient.closeBatch(closeBatchMessage, postCallback);
|
||||||
|
|
||||||
|
jobSemaphore.acquire();
|
||||||
|
|
||||||
|
// Attempt to open batch again
|
||||||
|
|
||||||
|
bulletinBoardClient.beginBatch(completeBatch.getBeginBatchMessage(), failPostCallback);
|
||||||
|
|
||||||
|
// Attempt to add batch data
|
||||||
|
|
||||||
|
bulletinBoardClient.postBatchData(signerIDs[SIGNER], BATCH_ID, completeBatch.getBatchDataList(), failPostCallback);
|
||||||
|
|
||||||
|
jobSemaphore.acquire(2);
|
||||||
|
|
||||||
|
// Read batch data
|
||||||
|
|
||||||
|
BatchSpecificationMessage batchSpecificationMessage =
|
||||||
|
BatchSpecificationMessage.newBuilder()
|
||||||
|
.setSignerId(signerIDs[SIGNER])
|
||||||
|
.setBatchId(BATCH_ID)
|
||||||
|
.setStartPosition(0)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
readBatchCallback = new ReadBatchCallback(completeBatch);
|
||||||
|
|
||||||
|
bulletinBoardClient.readBatch(batchSpecificationMessage, readBatchCallback);
|
||||||
|
|
||||||
|
jobSemaphore.acquire();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Posts a complete batch message
|
||||||
|
* Checks reading od the message
|
||||||
|
* @throws CommunicationException, SignatureException, InterruptedException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCompleteBatchPost() throws CommunicationException, SignatureException, InterruptedException {
|
||||||
|
|
||||||
|
final int SIGNER = 0;
|
||||||
|
final int BATCH_ID = 101;
|
||||||
|
final int BATCH_LENGTH = 50;
|
||||||
|
|
||||||
|
// Post batch
|
||||||
|
|
||||||
|
CompleteBatch completeBatch = createRandomBatch(SIGNER, BATCH_ID, BATCH_LENGTH);
|
||||||
|
|
||||||
|
bulletinBoardClient.postBatch(completeBatch,postCallback);
|
||||||
|
|
||||||
|
jobSemaphore.acquire();
|
||||||
|
|
||||||
|
// Read batch
|
||||||
|
|
||||||
|
BatchSpecificationMessage batchSpecificationMessage =
|
||||||
|
BatchSpecificationMessage.newBuilder()
|
||||||
|
.setSignerId(signerIDs[SIGNER])
|
||||||
|
.setBatchId(BATCH_ID)
|
||||||
|
.setStartPosition(0)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
readBatchCallback = new ReadBatchCallback(completeBatch);
|
||||||
|
|
||||||
|
bulletinBoardClient.readBatch(batchSpecificationMessage, readBatchCallback);
|
||||||
|
|
||||||
|
jobSemaphore.acquire();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that an unopened batch cannot be closed
|
||||||
|
* @throws CommunicationException, InterruptedException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testInvalidBatchClose() throws CommunicationException, InterruptedException {
|
||||||
|
|
||||||
|
final int NON_EXISTENT_BATCH_ID = 999;
|
||||||
|
|
||||||
|
CloseBatchMessage closeBatchMessage =
|
||||||
|
CloseBatchMessage.newBuilder()
|
||||||
|
.setBatchId(NON_EXISTENT_BATCH_ID)
|
||||||
|
.setBatchLength(1)
|
||||||
|
.setSig(Crypto.Signature.getDefaultInstance())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// Try to close the (unopened) batch;
|
||||||
|
|
||||||
|
bulletinBoardClient.closeBatch(closeBatchMessage, failPostCallback);
|
||||||
|
|
||||||
|
jobSemaphore.acquire();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -48,7 +48,7 @@ dependencies {
|
||||||
|
|
||||||
// JDBC connections
|
// JDBC connections
|
||||||
compile 'org.springframework:spring-jdbc:4.2.+'
|
compile 'org.springframework:spring-jdbc:4.2.+'
|
||||||
compile 'org.xerial:sqlite-jdbc:3.7.+'
|
compile 'org.xerial:sqlite-jdbc:3.8.+'
|
||||||
compile 'mysql:mysql-connector-java:5.1.+'
|
compile 'mysql:mysql-connector-java:5.1.+'
|
||||||
compile 'com.h2database:h2:1.0.+'
|
compile 'com.h2database:h2:1.0.+'
|
||||||
|
|
||||||
|
@ -89,6 +89,11 @@ task h2Test(type: Test) {
|
||||||
outputs.upToDateWhen { false }
|
outputs.upToDateWhen { false }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
task liteTest(type: Test) {
|
||||||
|
include '**/*SQLite*Test*'
|
||||||
|
outputs.upToDateWhen { false }
|
||||||
|
}
|
||||||
|
|
||||||
task dbTest(type: Test) {
|
task dbTest(type: Test) {
|
||||||
include '**/*H2*Test*'
|
include '**/*H2*Test*'
|
||||||
include '**/*MySQL*Test*'
|
include '**/*MySQL*Test*'
|
||||||
|
|
|
@ -177,8 +177,9 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{
|
||||||
case MSG_ID:
|
case MSG_ID:
|
||||||
return MSG_ID;
|
return MSG_ID;
|
||||||
|
|
||||||
case EXACT_ENTRY: // Go through
|
case EXACT_ENTRY: // Go through
|
||||||
case MAX_ENTRY:
|
case MAX_ENTRY: // Go through
|
||||||
|
case MIN_ENTRY:
|
||||||
return ENTRY_NUM;
|
return ENTRY_NUM;
|
||||||
|
|
||||||
case SIGNER_ID:
|
case SIGNER_ID:
|
||||||
|
@ -253,12 +254,13 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{
|
||||||
|
|
||||||
switch (messageFilter.getType()) {
|
switch (messageFilter.getType()) {
|
||||||
|
|
||||||
case MSG_ID: // Go through
|
case MSG_ID: // Go through
|
||||||
case SIGNER_ID:
|
case SIGNER_ID:
|
||||||
return messageFilter.getId().toByteArray();
|
return messageFilter.getId().toByteArray();
|
||||||
|
|
||||||
case EXACT_ENTRY: // Go through
|
case EXACT_ENTRY: // Go through
|
||||||
case MAX_ENTRY:
|
case MAX_ENTRY: // Go through
|
||||||
|
case MIN_ENTRY:
|
||||||
return messageFilter.getEntry();
|
return messageFilter.getEntry();
|
||||||
|
|
||||||
case TAG:
|
case TAG:
|
||||||
|
@ -653,7 +655,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{
|
||||||
|
|
||||||
for (int i=0 ; i < tags.length ; i++) {
|
for (int i=0 ; i < tags.length ; i++) {
|
||||||
namedParameters[i] = new MapSqlParameterSource();
|
namedParameters[i] = new MapSqlParameterSource();
|
||||||
namedParameters[i].addValue(QueryType.CONNECT_BATCH_TAG.getParamName(0),message.getSignerId());
|
namedParameters[i].addValue(QueryType.CONNECT_BATCH_TAG.getParamName(0),message.getSignerId().toByteArray());
|
||||||
namedParameters[i].addValue(QueryType.CONNECT_BATCH_TAG.getParamName(1),message.getBatchId());
|
namedParameters[i].addValue(QueryType.CONNECT_BATCH_TAG.getParamName(1),message.getBatchId());
|
||||||
namedParameters[i].addValue(QueryType.CONNECT_BATCH_TAG.getParamName(2),tags[i]);
|
namedParameters[i].addValue(QueryType.CONNECT_BATCH_TAG.getParamName(2),tags[i]);
|
||||||
}
|
}
|
||||||
|
@ -675,7 +677,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{
|
||||||
String sql = sqlQueryProvider.getSQLString(QueryType.INSERT_BATCH_DATA);
|
String sql = sqlQueryProvider.getSQLString(QueryType.INSERT_BATCH_DATA);
|
||||||
MapSqlParameterSource namedParameters = new MapSqlParameterSource();
|
MapSqlParameterSource namedParameters = new MapSqlParameterSource();
|
||||||
|
|
||||||
namedParameters.addValue(QueryType.INSERT_BATCH_DATA.getParamName(0),batchMessage.getSignerId());
|
namedParameters.addValue(QueryType.INSERT_BATCH_DATA.getParamName(0),batchMessage.getSignerId().toByteArray());
|
||||||
namedParameters.addValue(QueryType.INSERT_BATCH_DATA.getParamName(1),batchMessage.getBatchId());
|
namedParameters.addValue(QueryType.INSERT_BATCH_DATA.getParamName(1),batchMessage.getBatchId());
|
||||||
namedParameters.addValue(QueryType.INSERT_BATCH_DATA.getParamName(2),batchMessage.getSerialNum());
|
namedParameters.addValue(QueryType.INSERT_BATCH_DATA.getParamName(2),batchMessage.getSerialNum());
|
||||||
namedParameters.addValue(QueryType.INSERT_BATCH_DATA.getParamName(3),batchMessage.getData().toByteArray());
|
namedParameters.addValue(QueryType.INSERT_BATCH_DATA.getParamName(3),batchMessage.getData().toByteArray());
|
||||||
|
@ -700,7 +702,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{
|
||||||
MapSqlParameterSource namedParameters = new MapSqlParameterSource();
|
MapSqlParameterSource namedParameters = new MapSqlParameterSource();
|
||||||
|
|
||||||
|
|
||||||
namedParameters.addValue(QueryType.CHECK_BATCH_LENGTH.getParamName(0),signerId);
|
namedParameters.addValue(QueryType.CHECK_BATCH_LENGTH.getParamName(0),signerId.toByteArray());
|
||||||
namedParameters.addValue(QueryType.CHECK_BATCH_LENGTH.getParamName(1),batchId);
|
namedParameters.addValue(QueryType.CHECK_BATCH_LENGTH.getParamName(1),batchId);
|
||||||
|
|
||||||
List<Long> lengthResult = jdbcTemplate.query(sql, namedParameters, new LongMapper());
|
List<Long> lengthResult = jdbcTemplate.query(sql, namedParameters, new LongMapper());
|
||||||
|
@ -733,7 +735,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{
|
||||||
sql = sqlQueryProvider.getSQLString(QueryType.GET_BATCH_MESSAGE_DATA);
|
sql = sqlQueryProvider.getSQLString(QueryType.GET_BATCH_MESSAGE_DATA);
|
||||||
namedParameters = new MapSqlParameterSource();
|
namedParameters = new MapSqlParameterSource();
|
||||||
|
|
||||||
namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(0),signerId);
|
namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(0),signerId.toByteArray());
|
||||||
namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(1),batchId);
|
namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(1),batchId);
|
||||||
namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(2),0); // Read from the beginning
|
namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(2),0); // Read from the beginning
|
||||||
|
|
||||||
|
@ -775,7 +777,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{
|
||||||
sql = sqlQueryProvider.getSQLString(QueryType.REMOVE_BATCH_TAGS);
|
sql = sqlQueryProvider.getSQLString(QueryType.REMOVE_BATCH_TAGS);
|
||||||
namedParameters = new MapSqlParameterSource();
|
namedParameters = new MapSqlParameterSource();
|
||||||
|
|
||||||
namedParameters.addValue(QueryType.REMOVE_BATCH_TAGS.getParamName(0), signerId);
|
namedParameters.addValue(QueryType.REMOVE_BATCH_TAGS.getParamName(0), signerId.toByteArray());
|
||||||
namedParameters.addValue(QueryType.REMOVE_BATCH_TAGS.getParamName(1), batchId);
|
namedParameters.addValue(QueryType.REMOVE_BATCH_TAGS.getParamName(1), batchId);
|
||||||
|
|
||||||
jdbcTemplate.update(sql, namedParameters);
|
jdbcTemplate.update(sql, namedParameters);
|
||||||
|
@ -786,7 +788,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<BatchData> readBatch(BatchSpecificationMessage message) throws CommunicationException, IllegalArgumentException{
|
public BatchDataList readBatch(BatchSpecificationMessage message) throws CommunicationException, IllegalArgumentException{
|
||||||
|
|
||||||
// Check that batch is closed
|
// Check that batch is closed
|
||||||
if (!isBatchClosed(message.getSignerId(), message.getBatchId())) {
|
if (!isBatchClosed(message.getSignerId(), message.getBatchId())) {
|
||||||
|
@ -796,11 +798,13 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{
|
||||||
String sql = sqlQueryProvider.getSQLString(QueryType.GET_BATCH_MESSAGE_DATA);
|
String sql = sqlQueryProvider.getSQLString(QueryType.GET_BATCH_MESSAGE_DATA);
|
||||||
MapSqlParameterSource namedParameters = new MapSqlParameterSource();
|
MapSqlParameterSource namedParameters = new MapSqlParameterSource();
|
||||||
|
|
||||||
namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(0),message.getSignerId());
|
namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(0),message.getSignerId().toByteArray());
|
||||||
namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(1),message.getBatchId());
|
namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(1),message.getBatchId());
|
||||||
namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(2),message.getStartPosition());
|
namedParameters.addValue(QueryType.GET_BATCH_MESSAGE_DATA.getParamName(2),message.getStartPosition());
|
||||||
|
|
||||||
return jdbcTemplate.query(sql, namedParameters, new BatchDataMapper());
|
return BatchDataList.newBuilder()
|
||||||
|
.addAllData(jdbcTemplate.query(sql, namedParameters, new BatchDataMapper()))
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -134,6 +134,8 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider
|
||||||
return "MsgTable.EntryNum = :EntryNum" + serialString;
|
return "MsgTable.EntryNum = :EntryNum" + serialString;
|
||||||
case MAX_ENTRY:
|
case MAX_ENTRY:
|
||||||
return "MsgTable.EntryNum <= :EntryNum" + serialString;
|
return "MsgTable.EntryNum <= :EntryNum" + serialString;
|
||||||
|
case MIN_ENTRY:
|
||||||
|
return "MsgTable.EntryNum >= :EntryNum" + serialString;
|
||||||
case MAX_MESSAGES:
|
case MAX_MESSAGES:
|
||||||
return "LIMIT :Limit" + serialString;
|
return "LIMIT :Limit" + serialString;
|
||||||
case MSG_ID:
|
case MSG_ID:
|
||||||
|
@ -157,6 +159,7 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider
|
||||||
switch(filterType) {
|
switch(filterType) {
|
||||||
case EXACT_ENTRY: // Go through
|
case EXACT_ENTRY: // Go through
|
||||||
case MAX_ENTRY: // Go through
|
case MAX_ENTRY: // Go through
|
||||||
|
case MIN_ENTRY: // Go through
|
||||||
case MAX_MESSAGES:
|
case MAX_MESSAGES:
|
||||||
return "INT";
|
return "INT";
|
||||||
|
|
||||||
|
|
|
@ -151,6 +151,8 @@ public class MySQLQueryProvider implements SQLQueryProvider {
|
||||||
return "MsgTable.EntryNum = :EntryNum" + serialString;
|
return "MsgTable.EntryNum = :EntryNum" + serialString;
|
||||||
case MAX_ENTRY:
|
case MAX_ENTRY:
|
||||||
return "MsgTable.EntryNum <= :EntryNum" + serialString;
|
return "MsgTable.EntryNum <= :EntryNum" + serialString;
|
||||||
|
case MIN_ENTRY:
|
||||||
|
return "MsgTable.EntryNum >= :EntryNum" + serialString;
|
||||||
case MAX_MESSAGES:
|
case MAX_MESSAGES:
|
||||||
return "LIMIT :Limit" + serialString;
|
return "LIMIT :Limit" + serialString;
|
||||||
case MSG_ID:
|
case MSG_ID:
|
||||||
|
@ -174,6 +176,7 @@ public class MySQLQueryProvider implements SQLQueryProvider {
|
||||||
switch(filterType) {
|
switch(filterType) {
|
||||||
case EXACT_ENTRY: // Go through
|
case EXACT_ENTRY: // Go through
|
||||||
case MAX_ENTRY: // Go through
|
case MAX_ENTRY: // Go through
|
||||||
|
case MIN_ENTRY: // Go through
|
||||||
case MAX_MESSAGES:
|
case MAX_MESSAGES:
|
||||||
return "INT";
|
return "INT";
|
||||||
|
|
||||||
|
|
|
@ -54,6 +54,8 @@ public class SQLiteQueryProvider implements BulletinBoardSQLServer.SQLQueryProvi
|
||||||
return "MsgTable.EntryNum = :EntryNum" + serialString;
|
return "MsgTable.EntryNum = :EntryNum" + serialString;
|
||||||
case MAX_ENTRY:
|
case MAX_ENTRY:
|
||||||
return "MsgTable.EntryNum <= :EntryNum" + serialString;
|
return "MsgTable.EntryNum <= :EntryNum" + serialString;
|
||||||
|
case MIN_ENTRY:
|
||||||
|
return "MsgTable.EntryNum <= :EntryNum" + serialString;
|
||||||
case MAX_MESSAGES:
|
case MAX_MESSAGES:
|
||||||
return "LIMIT = :Limit" + serialString;
|
return "LIMIT = :Limit" + serialString;
|
||||||
case MSG_ID:
|
case MSG_ID:
|
||||||
|
@ -77,6 +79,7 @@ public class SQLiteQueryProvider implements BulletinBoardSQLServer.SQLQueryProvi
|
||||||
switch(filterType) {
|
switch(filterType) {
|
||||||
case EXACT_ENTRY: // Go through
|
case EXACT_ENTRY: // Go through
|
||||||
case MAX_ENTRY: // Go through
|
case MAX_ENTRY: // Go through
|
||||||
|
case MIN_ENTRY: // Go through
|
||||||
case MAX_MESSAGES:
|
case MAX_MESSAGES:
|
||||||
return "INTEGER";
|
return "INTEGER";
|
||||||
|
|
||||||
|
|
|
@ -106,6 +106,7 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL
|
||||||
@Override
|
@Override
|
||||||
public BoolMsg beginBatch(BeginBatchMessage message) {
|
public BoolMsg beginBatch(BeginBatchMessage message) {
|
||||||
try {
|
try {
|
||||||
|
init();
|
||||||
return bulletinBoard.beginBatch(message);
|
return bulletinBoard.beginBatch(message);
|
||||||
} catch (CommunicationException e) {
|
} catch (CommunicationException e) {
|
||||||
System.err.println(e.getMessage());
|
System.err.println(e.getMessage());
|
||||||
|
@ -120,6 +121,7 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL
|
||||||
@Override
|
@Override
|
||||||
public BoolMsg postBatchMessage(BatchMessage batchMessage) {
|
public BoolMsg postBatchMessage(BatchMessage batchMessage) {
|
||||||
try {
|
try {
|
||||||
|
init();
|
||||||
return bulletinBoard.postBatchMessage(batchMessage);
|
return bulletinBoard.postBatchMessage(batchMessage);
|
||||||
} catch (CommunicationException e) {
|
} catch (CommunicationException e) {
|
||||||
System.err.println(e.getMessage());
|
System.err.println(e.getMessage());
|
||||||
|
@ -134,6 +136,7 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL
|
||||||
@Override
|
@Override
|
||||||
public BoolMsg closeBatchMessage(CloseBatchMessage message) {
|
public BoolMsg closeBatchMessage(CloseBatchMessage message) {
|
||||||
try {
|
try {
|
||||||
|
init();
|
||||||
return bulletinBoard.closeBatchMessage(message);
|
return bulletinBoard.closeBatchMessage(message);
|
||||||
} catch (CommunicationException e) {
|
} catch (CommunicationException e) {
|
||||||
System.err.println(e.getMessage());
|
System.err.println(e.getMessage());
|
||||||
|
@ -146,8 +149,9 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL
|
||||||
@Consumes(MEDIATYPE_PROTOBUF)
|
@Consumes(MEDIATYPE_PROTOBUF)
|
||||||
@Produces(MEDIATYPE_PROTOBUF)
|
@Produces(MEDIATYPE_PROTOBUF)
|
||||||
@Override
|
@Override
|
||||||
public List<BatchData> readBatch(BatchSpecificationMessage message) {
|
public BatchDataList readBatch(BatchSpecificationMessage message) {
|
||||||
try {
|
try {
|
||||||
|
init();
|
||||||
return bulletinBoard.readBatch(message);
|
return bulletinBoard.readBatch(message);
|
||||||
} catch (CommunicationException | IllegalArgumentException e) {
|
} catch (CommunicationException | IllegalArgumentException e) {
|
||||||
System.err.println(e.getMessage());
|
System.err.println(e.getMessage());
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package meerkat.bulletinboard;
|
package meerkat.bulletinboard;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
|
|
||||||
|
@ -10,11 +11,6 @@ import java.util.List;
|
||||||
*/
|
*/
|
||||||
public interface AsyncBulletinBoardClient extends BulletinBoardClient {
|
public interface AsyncBulletinBoardClient extends BulletinBoardClient {
|
||||||
|
|
||||||
public interface ClientCallback<T> {
|
|
||||||
void handleCallback(T msg);
|
|
||||||
void handleFailure(Throwable t);
|
|
||||||
}
|
|
||||||
|
|
||||||
public interface MessageHandler {
|
public interface MessageHandler {
|
||||||
void handleNewMessages(List<BulletinBoardMessage> messageList);
|
void handleNewMessages(List<BulletinBoardMessage> messageList);
|
||||||
}
|
}
|
||||||
|
@ -25,7 +21,7 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient {
|
||||||
* @param callback is a class containing methods to handle the result of the operation
|
* @param callback is a class containing methods to handle the result of the operation
|
||||||
* @return a unique message ID for the message, that can be later used to retrieve the batch
|
* @return a unique message ID for the message, that can be later used to retrieve the batch
|
||||||
*/
|
*/
|
||||||
public MessageID postMessage(BulletinBoardMessage msg, ClientCallback<Boolean> callback);
|
public MessageID postMessage(BulletinBoardMessage msg, FutureCallback<Boolean> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Perform an end-to-end post of a signed batch message
|
* Perform an end-to-end post of a signed batch message
|
||||||
|
@ -33,14 +29,14 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient {
|
||||||
* @param callback is a class containing methods to handle the result of the operation
|
* @param callback is a class containing methods to handle the result of the operation
|
||||||
* @return a unique identifier for the batch message
|
* @return a unique identifier for the batch message
|
||||||
*/
|
*/
|
||||||
public MessageID postBatch(CompleteBatch completeBatch, ClientCallback<Boolean> callback);
|
public MessageID postBatch(CompleteBatch completeBatch, FutureCallback<Boolean> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This message informs the server about the existence of a new batch message and supplies it with the tags associated with it
|
* This message informs the server about the existence of a new batch message and supplies it with the tags associated with it
|
||||||
* @param beginBatchMessage contains the data required to begin the batch
|
* @param beginBatchMessage contains the data required to begin the batch
|
||||||
* @param callback is a callback function class for handling results of the operation
|
* @param callback is a callback function class for handling results of the operation
|
||||||
*/
|
*/
|
||||||
public void beginBatch(BeginBatchMessage beginBatchMessage, ClientCallback<Boolean> callback);
|
public void beginBatch(BeginBatchMessage beginBatchMessage, FutureCallback<Boolean> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method posts batch data into an (assumed to be open) batch
|
* This method posts batch data into an (assumed to be open) batch
|
||||||
|
@ -54,30 +50,30 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient {
|
||||||
* @param callback is a callback function class for handling results of the operation
|
* @param callback is a callback function class for handling results of the operation
|
||||||
*/
|
*/
|
||||||
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList,
|
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList,
|
||||||
int startPosition, ClientCallback<Boolean> callback);
|
int startPosition, FutureCallback<Boolean> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Overloading of the postBatchData method which starts at the first position in the batch
|
* Overloading of the postBatchData method which starts at the first position in the batch
|
||||||
*/
|
*/
|
||||||
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, ClientCallback<Boolean> callback);
|
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, FutureCallback<Boolean> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Overloading of the postBatchData method which uses ByteString
|
* Overloading of the postBatchData method which uses ByteString
|
||||||
*/
|
*/
|
||||||
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList,
|
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList,
|
||||||
int startPosition, ClientCallback<Boolean> callback);
|
int startPosition, FutureCallback<Boolean> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Overloading of the postBatchData method which uses ByteString and starts at the first position in the batch
|
* Overloading of the postBatchData method which uses ByteString and starts at the first position in the batch
|
||||||
*/
|
*/
|
||||||
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList, ClientCallback<Boolean> callback);
|
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList, FutureCallback<Boolean> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Attempts to close a batch message
|
* Attempts to close a batch message
|
||||||
* @param closeBatchMessage contains the data required to close the batch
|
* @param closeBatchMessage contains the data required to close the batch
|
||||||
* @param callback is a callback function class for handling results of the operation
|
* @param callback is a callback function class for handling results of the operation
|
||||||
*/
|
*/
|
||||||
public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback<Boolean> callback);
|
public void closeBatch(CloseBatchMessage closeBatchMessage, FutureCallback<Boolean> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check how "safe" a given message is in an asynchronous manner
|
* Check how "safe" a given message is in an asynchronous manner
|
||||||
|
@ -85,7 +81,7 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient {
|
||||||
* @param id is the unique message identifier for retrieval
|
* @param id is the unique message identifier for retrieval
|
||||||
* @param callback is a callback function class for handling results of the operation
|
* @param callback is a callback function class for handling results of the operation
|
||||||
*/
|
*/
|
||||||
public void getRedundancy(MessageID id, ClientCallback<Float> callback);
|
public void getRedundancy(MessageID id, FutureCallback<Float> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read all messages posted matching the given filter in an asynchronous manner
|
* Read all messages posted matching the given filter in an asynchronous manner
|
||||||
|
@ -95,14 +91,14 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient {
|
||||||
* @param filterList return only messages that match the filters (null means no filtering).
|
* @param filterList return only messages that match the filters (null means no filtering).
|
||||||
* @param callback is a callback function class for handling results of the operation
|
* @param callback is a callback function class for handling results of the operation
|
||||||
*/
|
*/
|
||||||
public void readMessages(MessageFilterList filterList, ClientCallback<List<BulletinBoardMessage>> callback);
|
public void readMessages(MessageFilterList filterList, FutureCallback<List<BulletinBoardMessage>> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read a given batch message from the bulletin board
|
* Read a given batch message from the bulletin board
|
||||||
* @param batchSpecificationMessage contains the data required to specify a single batch instance
|
* @param batchSpecificationMessage contains the data required to specify a single batch instance
|
||||||
* @param callback is a callback class for handling the result of the operation
|
* @param callback is a callback class for handling the result of the operation
|
||||||
*/
|
*/
|
||||||
public void readBatch(BatchSpecificationMessage batchSpecificationMessage, ClientCallback<CompleteBatch> callback);
|
public void readBatch(BatchSpecificationMessage batchSpecificationMessage, FutureCallback<CompleteBatch> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Subscribes to a notifier that will return any new messages on the server that match the given filters
|
* Subscribes to a notifier that will return any new messages on the server that match the given filters
|
||||||
|
|
|
@ -75,7 +75,7 @@ public interface BulletinBoardServer{
|
||||||
* @throws CommunicationException on DB connection error
|
* @throws CommunicationException on DB connection error
|
||||||
* @throws IllegalArgumentException if message does not specify a batch
|
* @throws IllegalArgumentException if message does not specify a batch
|
||||||
*/
|
*/
|
||||||
public List<BatchData> readBatch(BatchSpecificationMessage message) throws CommunicationException, IllegalArgumentException;
|
public BatchDataList readBatch(BatchSpecificationMessage message) throws CommunicationException, IllegalArgumentException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method closes the connection to the DB
|
* This method closes the connection to the DB
|
||||||
|
|
|
@ -2,6 +2,7 @@ package meerkat.bulletinboard;
|
||||||
|
|
||||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
import meerkat.protobuf.Crypto.*;
|
import meerkat.protobuf.Crypto.*;
|
||||||
|
import meerkat.util.BulletinBoardMessageComparator;
|
||||||
|
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -48,6 +49,14 @@ public class CompleteBatch {
|
||||||
return signature;
|
return signature;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public CloseBatchMessage getCloseBatchMessage() {
|
||||||
|
return CloseBatchMessage.newBuilder()
|
||||||
|
.setBatchId(getBeginBatchMessage().getBatchId())
|
||||||
|
.setBatchLength(getBatchDataList().size())
|
||||||
|
.setSig(getSignature())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
public void setBeginBatchMessage(BeginBatchMessage beginBatchMessage) {
|
public void setBeginBatchMessage(BeginBatchMessage beginBatchMessage) {
|
||||||
this.beginBatchMessage = beginBatchMessage;
|
this.beginBatchMessage = beginBatchMessage;
|
||||||
}
|
}
|
||||||
|
@ -64,4 +73,45 @@ public class CompleteBatch {
|
||||||
signature = newSignature;
|
signature = newSignature;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object other) {
|
||||||
|
|
||||||
|
if (!(other instanceof CompleteBatch)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
CompleteBatch otherBatch = (CompleteBatch) other;
|
||||||
|
|
||||||
|
boolean result = true;
|
||||||
|
|
||||||
|
if (beginBatchMessage == null) {
|
||||||
|
if (otherBatch.getBeginBatchMessage() != null)
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
result = result && beginBatchMessage.equals(otherBatch.getBeginBatchMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (batchDataList == null) {
|
||||||
|
if (otherBatch.getBatchDataList() != null)
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
result = result && batchDataList.equals(otherBatch.getBatchDataList());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (signature == null) {
|
||||||
|
if (otherBatch.getSignature() != null)
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
result = result && signature.equals(otherBatch.getSignature());
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "Batch " + beginBatchMessage.getSignerId().toString() + ":" + beginBatchMessage.getBatchId();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,65 @@
|
||||||
|
package meerkat.util;
|
||||||
|
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
|
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by Arbel Deutsch Peled on 16-Feb-16.
|
||||||
|
*/
|
||||||
|
public class BulletinBoardUtils {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Searches the tags in the message for one that begins with given prefix
|
||||||
|
* @param message is the message to search
|
||||||
|
* @param prefix is the given prefix
|
||||||
|
* @return the tag without the prefix, if found, or null if not found
|
||||||
|
*/
|
||||||
|
public static String findTagWithPrefix(BulletinBoardMessage message, String prefix) {
|
||||||
|
|
||||||
|
for (String tag : message.getMsg().getTagList()){
|
||||||
|
if (tag.startsWith(prefix)) {
|
||||||
|
return tag.substring(prefix.length());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Searches the tags in a message for tags that do not contain a given list of prefixes
|
||||||
|
* @param message is the message to search
|
||||||
|
* @param prefixes is the list of prefixes
|
||||||
|
* @return a list of the tags that do *not* contain any of the given prefixes
|
||||||
|
*/
|
||||||
|
public static List<String> removePrefixTags(BulletinBoardMessage message, Iterable<String> prefixes) {
|
||||||
|
|
||||||
|
if (prefixes == null)
|
||||||
|
return message.getMsg().getTagList();
|
||||||
|
|
||||||
|
List<String> result = new LinkedList<>();
|
||||||
|
|
||||||
|
for (String tag : message.getMsg().getTagList()){
|
||||||
|
|
||||||
|
boolean found = false;
|
||||||
|
|
||||||
|
for (String prefix : prefixes){
|
||||||
|
if (tag.startsWith(prefix)){
|
||||||
|
found = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!found) {
|
||||||
|
result.add(tag);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -50,13 +50,14 @@ enum FilterType {
|
||||||
MSG_ID = 0; // Match exact message ID
|
MSG_ID = 0; // Match exact message ID
|
||||||
EXACT_ENTRY = 1; // Match exact entry number in database (chronological)
|
EXACT_ENTRY = 1; // Match exact entry number in database (chronological)
|
||||||
MAX_ENTRY = 2; // Find all entries in database up to specified entry number (chronological)
|
MAX_ENTRY = 2; // Find all entries in database up to specified entry number (chronological)
|
||||||
SIGNER_ID = 3; // Find all entries in database that correspond to specific signature (signer)
|
MIN_ENTRY = 3; // Find all entries in database starting from specified entry number (chronological)
|
||||||
TAG = 4; // Find all entries in database that have a specific tag
|
SIGNER_ID = 4; // Find all entries in database that correspond to specific signature (signer)
|
||||||
|
TAG = 5; // Find all entries in database that have a specific tag
|
||||||
|
|
||||||
// NOTE: The MAX_MESSAGES filter must remain the last filter type
|
// NOTE: The MAX_MESSAGES filter must remain the last filter type
|
||||||
// This is because the condition it specifies in an SQL statement must come last in the statement
|
// This is because the condition it specifies in an SQL statement must come last in the statement
|
||||||
// Keeping it last here allows for easily sorting the filters and keeping the code general
|
// Keeping it last here allows for easily sorting the filters and keeping the code general
|
||||||
MAX_MESSAGES = 5; // Return at most some specified number of messages
|
MAX_MESSAGES = 6; // Return at most some specified number of messages
|
||||||
}
|
}
|
||||||
|
|
||||||
message MessageFilter {
|
message MessageFilter {
|
||||||
|
@ -98,6 +99,11 @@ message BatchData {
|
||||||
bytes data = 1;
|
bytes data = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// List of BatchData; Only used for testing
|
||||||
|
message BatchDataList {
|
||||||
|
repeated BatchData data = 1;
|
||||||
|
}
|
||||||
|
|
||||||
// These messages comprise a batch message
|
// These messages comprise a batch message
|
||||||
message BatchMessage {
|
message BatchMessage {
|
||||||
bytes signerId = 1; // Unique signer identifier
|
bytes signerId = 1; // Unique signer identifier
|
||||||
|
|
Loading…
Reference in New Issue