First (untested) version of BB Client with full batch support
parent
141d286af2
commit
3fed32f9e6
|
@ -0,0 +1,25 @@
|
||||||
|
package meerkat.bulletinboard;
|
||||||
|
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.BatchData;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by Arbel Deutsch Peled on 17-Jan-16.
|
||||||
|
* Used to store the complete data required for sending a batch data list inside a single object
|
||||||
|
*/
|
||||||
|
public class BatchDataContainer {
|
||||||
|
|
||||||
|
public final byte[] signerId;
|
||||||
|
public final int batchId;
|
||||||
|
public final List<BatchData> batchDataList;
|
||||||
|
public final int startPosition;
|
||||||
|
|
||||||
|
public BatchDataContainer(byte[] signerId, int batchId, List<BatchData> batchDataList, int startPosition) {
|
||||||
|
this.signerId = signerId;
|
||||||
|
this.batchId = batchId;
|
||||||
|
this.batchDataList = batchDataList;
|
||||||
|
this.startPosition = startPosition;
|
||||||
|
}
|
||||||
|
}
|
|
@ -5,9 +5,7 @@ import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
|
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import meerkat.bulletinboard.workers.SingleServerGetRedundancyWorker;
|
import meerkat.bulletinboard.workers.singleserver.*;
|
||||||
import meerkat.bulletinboard.workers.SingleServerPostMessageWorker;
|
|
||||||
import meerkat.bulletinboard.workers.SingleServerReadMessagesWorker;
|
|
||||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
import meerkat.protobuf.Voting.BulletinBoardClientParams;
|
import meerkat.protobuf.Voting.BulletinBoardClientParams;
|
||||||
|
|
||||||
|
@ -30,6 +28,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
|
|
||||||
protected ListeningScheduledExecutorService executorService;
|
protected ListeningScheduledExecutorService executorService;
|
||||||
|
|
||||||
|
protected BatchDigest batchDigest;
|
||||||
|
|
||||||
private long lastServerErrorTime;
|
private long lastServerErrorTime;
|
||||||
|
|
||||||
protected final long failDelayInMilliseconds;
|
protected final long failDelayInMilliseconds;
|
||||||
|
@ -136,6 +136,9 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
// Perform usual setup
|
// Perform usual setup
|
||||||
super.init(clientParams);
|
super.init(clientParams);
|
||||||
|
|
||||||
|
// Wrap the Digest into a BatchDigest
|
||||||
|
batchDigest = new GenericBatchDigest(digest);
|
||||||
|
|
||||||
// Remove all but first DB address
|
// Remove all but first DB address
|
||||||
String dbAddress = meerkatDBs.get(0);
|
String dbAddress = meerkatDBs.get(0);
|
||||||
meerkatDBs = new LinkedList<String>();
|
meerkatDBs = new LinkedList<String>();
|
||||||
|
@ -153,19 +156,124 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
scheduleWorker(worker, new RetryCallback(worker, callback));
|
scheduleWorker(worker, new RetryCallback(worker, callback));
|
||||||
|
|
||||||
// Calculate the correct message ID and return it
|
// Calculate the correct message ID and return it
|
||||||
digest.reset();
|
batchDigest.reset();
|
||||||
digest.update(msg.getMsg());
|
batchDigest.update(msg.getMsg());
|
||||||
return MessageID.newBuilder().setID(ByteString.copyFrom(digest.digest())).build();
|
return batchDigest.digestAsMessageID();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class PostBatchDataCallback implements ClientCallback<Boolean> {
|
||||||
|
|
||||||
|
private CompleteBatch completeBatch;
|
||||||
|
ClientCallback<Boolean> callback;
|
||||||
|
|
||||||
|
public PostBatchDataCallback(CompleteBatch completeBatch, ClientCallback<Boolean> callback) {
|
||||||
|
this.completeBatch = completeBatch;
|
||||||
|
this.callback = callback;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleCallback(Boolean msg) {
|
||||||
|
closeBatch(
|
||||||
|
CloseBatchMessage.newBuilder()
|
||||||
|
.setBatchId(completeBatch.getBeginBatchMessage().getBatchId())
|
||||||
|
.setSig(completeBatch.getSignature())
|
||||||
|
.setBatchLength(completeBatch.getBatchDataList().size())
|
||||||
|
.build(),
|
||||||
|
callback
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleFailure(Throwable t) {
|
||||||
|
callback.handleFailure(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private class BeginBatchCallback implements ClientCallback<Boolean> {
|
||||||
|
|
||||||
|
private CompleteBatch completeBatch;
|
||||||
|
ClientCallback<Boolean> callback;
|
||||||
|
|
||||||
|
public BeginBatchCallback(CompleteBatch completeBatch, ClientCallback<Boolean> callback) {
|
||||||
|
this.completeBatch = completeBatch;
|
||||||
|
this.callback = callback;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleCallback(Boolean msg) {
|
||||||
|
|
||||||
|
postBatchData(
|
||||||
|
completeBatch.getBeginBatchMessage().getSignerId(),
|
||||||
|
completeBatch.getBeginBatchMessage().getBatchId(),
|
||||||
|
completeBatch.getBatchDataList(),
|
||||||
|
0,
|
||||||
|
new PostBatchDataCallback(completeBatch,callback));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleFailure(Throwable t) {
|
||||||
|
callback.handleFailure(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MessageID postBatch(CompleteBatch completeBatch, ClientCallback<Boolean> callback) {
|
public MessageID postBatch(CompleteBatch completeBatch, ClientCallback<Boolean> callback) {
|
||||||
return null;
|
|
||||||
|
beginBatch(
|
||||||
|
completeBatch.getBeginBatchMessage(),
|
||||||
|
new BeginBatchCallback(completeBatch, callback)
|
||||||
|
);
|
||||||
|
|
||||||
|
batchDigest.update(completeBatch);
|
||||||
|
|
||||||
|
return batchDigest.digestAsMessageID();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void beginBatch(byte[] signerId, int batchId, List<String> tagList, ClientCallback<Boolean> callback) {
|
public void beginBatch(BeginBatchMessage beginBatchMessage, ClientCallback<Boolean> callback) {
|
||||||
|
|
||||||
|
// Create worker with redundancy 1 and MAX_RETRIES retries
|
||||||
|
SingleServerBeginBatchWorker worker =
|
||||||
|
new SingleServerBeginBatchWorker(meerkatDBs.get(0), beginBatchMessage, MAX_RETRIES);
|
||||||
|
|
||||||
|
// Submit worker and create callback
|
||||||
|
scheduleWorker(worker, new RetryCallback(worker, callback));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList,
|
||||||
|
int startPosition, ClientCallback<Boolean> callback) {
|
||||||
|
|
||||||
|
BatchMessage.Builder builder = BatchMessage.newBuilder()
|
||||||
|
.setSignerId(signerId)
|
||||||
|
.setBatchId(batchId);
|
||||||
|
|
||||||
|
// Iterate through data list
|
||||||
|
|
||||||
|
for (BatchData data : batchDataList) {
|
||||||
|
builder.setSerialNum(startPosition).setData(data);
|
||||||
|
|
||||||
|
// Create worker with redundancy 1 and MAX_RETRIES retries
|
||||||
|
SingleServerPostBatchWorker worker =
|
||||||
|
new SingleServerPostBatchWorker(meerkatDBs.get(0), builder.build(), MAX_RETRIES);
|
||||||
|
|
||||||
|
// Create worker with redundancy 1 and MAX_RETRIES retries
|
||||||
|
scheduleWorker(worker, new RetryCallback(worker, callback));
|
||||||
|
|
||||||
|
// Increment position in batch
|
||||||
|
startPosition++;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList, ClientCallback<Boolean> callback) {
|
||||||
|
|
||||||
|
postBatchData(signerId, batchId, batchDataList, 0, callback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -173,15 +281,26 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList,
|
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList,
|
||||||
int startPosition, ClientCallback<Boolean> callback) {
|
int startPosition, ClientCallback<Boolean> callback) {
|
||||||
|
|
||||||
|
postBatchData(ByteString.copyFrom(signerId), batchId, batchDataList, startPosition, callback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, ClientCallback<Boolean> callback) {
|
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, ClientCallback<Boolean> callback) {
|
||||||
|
|
||||||
|
postBatchData(signerId, batchId, batchDataList, 0, callback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback<?> callback) {
|
public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback<Boolean> callback) {
|
||||||
|
|
||||||
|
// Create worker with redundancy 1 and MAX_RETRIES retries
|
||||||
|
SingleServerCloseBatchWorker worker =
|
||||||
|
new SingleServerCloseBatchWorker(meerkatDBs.get(0), closeBatchMessage, MAX_RETRIES);
|
||||||
|
|
||||||
|
// Submit worker and create callback
|
||||||
|
scheduleWorker(worker, new RetryCallback(worker, callback));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -208,7 +327,13 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readBatch(byte[] signerId, int batchId, ClientCallback<CompleteBatch> callback) {
|
public void readBatch(BatchSpecificationMessage batchSpecificationMessage, ClientCallback<CompleteBatch> callback) {
|
||||||
|
|
||||||
|
// Create job with no retries
|
||||||
|
SingleServerReadBatchWorker worker = new SingleServerReadBatchWorker(meerkatDBs.get(0), batchSpecificationMessage, 1);
|
||||||
|
|
||||||
|
// Submit job and create callback
|
||||||
|
scheduleWorker(worker, new RetryCallback(worker, callback));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,9 +2,7 @@ package meerkat.bulletinboard;
|
||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
|
|
||||||
import meerkat.bulletinboard.workers.MultiServerGetRedundancyWorker;
|
import meerkat.bulletinboard.workers.multiserver.*;
|
||||||
import meerkat.bulletinboard.workers.MultiServerPostMessageWorker;
|
|
||||||
import meerkat.bulletinboard.workers.MultiServerReadMessagesWorker;
|
|
||||||
import meerkat.comm.CommunicationException;
|
import meerkat.comm.CommunicationException;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
import meerkat.protobuf.Voting.*;
|
import meerkat.protobuf.Voting.*;
|
||||||
|
@ -31,6 +29,8 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
|
||||||
// Per-server clients
|
// Per-server clients
|
||||||
List<SingleServerBulletinBoardClient> clients;
|
List<SingleServerBulletinBoardClient> clients;
|
||||||
|
|
||||||
|
BatchDigest batchDigest;
|
||||||
|
|
||||||
private final static int POST_MESSAGE_RETRY_NUM = 3;
|
private final static int POST_MESSAGE_RETRY_NUM = 3;
|
||||||
private final static int READ_MESSAGES_RETRY_NUM = 1;
|
private final static int READ_MESSAGES_RETRY_NUM = 1;
|
||||||
private final static int GET_REDUNDANCY_RETRY_NUM = 1;
|
private final static int GET_REDUNDANCY_RETRY_NUM = 1;
|
||||||
|
@ -51,6 +51,8 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
|
||||||
|
|
||||||
super.init(clientParams);
|
super.init(clientParams);
|
||||||
|
|
||||||
|
batchDigest = new GenericBatchDigest(digest);
|
||||||
|
|
||||||
minAbsoluteRedundancy = (int) (clientParams.getMinRedundancy() * clientParams.getBulletinBoardAddressCount());
|
minAbsoluteRedundancy = (int) (clientParams.getMinRedundancy() * clientParams.getBulletinBoardAddressCount());
|
||||||
|
|
||||||
executorService = Executors.newFixedThreadPool(JOBS_THREAD_NUM);
|
executorService = Executors.newFixedThreadPool(JOBS_THREAD_NUM);
|
||||||
|
@ -84,42 +86,88 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
|
||||||
executorService.submit(worker);
|
executorService.submit(worker);
|
||||||
|
|
||||||
// Calculate the correct message ID and return it
|
// Calculate the correct message ID and return it
|
||||||
digest.reset();
|
batchDigest.reset();
|
||||||
digest.update(msg.getMsg());
|
batchDigest.update(msg.getMsg());
|
||||||
return MessageID.newBuilder().setID(ByteString.copyFrom(digest.digest())).build();
|
return batchDigest.digestAsMessageID();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MessageID postBatch(CompleteBatch completeBatch, ClientCallback<Boolean> callback) {
|
public MessageID postBatch(CompleteBatch completeBatch, ClientCallback<Boolean> callback) {
|
||||||
|
|
||||||
return null; // TODO: write this
|
// Create job
|
||||||
|
MultiServerPostBatchWorker worker =
|
||||||
|
new MultiServerPostBatchWorker(clients, minAbsoluteRedundancy, completeBatch, POST_MESSAGE_RETRY_NUM, callback);
|
||||||
|
|
||||||
|
// Submit job
|
||||||
|
executorService.submit(worker);
|
||||||
|
|
||||||
|
// Calculate the correct message ID and return it
|
||||||
|
batchDigest.reset();
|
||||||
|
batchDigest.update(completeBatch);
|
||||||
|
return batchDigest.digestAsMessageID();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void beginBatch(byte[] signerId, int batchId, List<String> tagList, ClientCallback<Boolean> callback) {
|
public void beginBatch(BeginBatchMessage beginBatchMessage, ClientCallback<Boolean> callback) {
|
||||||
// TODO: write this
|
|
||||||
|
// Create job
|
||||||
|
MultiServerBeginBatchWorker worker =
|
||||||
|
new MultiServerBeginBatchWorker(clients, minAbsoluteRedundancy, beginBatchMessage, POST_MESSAGE_RETRY_NUM, callback);
|
||||||
|
|
||||||
|
// Submit job
|
||||||
|
executorService.submit(worker);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList,
|
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList,
|
||||||
int startPosition, ClientCallback<Boolean> callback) {
|
int startPosition, ClientCallback<Boolean> callback) {
|
||||||
|
|
||||||
// TODO: write this
|
BatchDataContainer batchDataContainer = new BatchDataContainer(signerId, batchId, batchDataList, startPosition);
|
||||||
|
|
||||||
|
// Create job
|
||||||
|
MultiServerPostBatchDataWorker worker =
|
||||||
|
new MultiServerPostBatchDataWorker(clients, minAbsoluteRedundancy, batchDataContainer, POST_MESSAGE_RETRY_NUM, callback);
|
||||||
|
|
||||||
|
// Submit job
|
||||||
|
executorService.submit(worker);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, ClientCallback<Boolean> callback) {
|
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, ClientCallback<Boolean> callback) {
|
||||||
|
|
||||||
postBatchData(signerId, batchId, batchDataList, 0, callback); // Write batch from beginning
|
postBatchData(signerId, batchId, batchDataList, 0, callback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback<?> callback) {
|
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList,
|
||||||
// TODO: write this
|
int startPosition, ClientCallback<Boolean> callback) {
|
||||||
|
|
||||||
|
postBatchData(signerId.toByteArray(), batchId, batchDataList, startPosition, callback);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList, ClientCallback<Boolean> callback) {
|
||||||
|
|
||||||
|
postBatchData(signerId, batchId, batchDataList, 0, callback);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback<Boolean> callback) {
|
||||||
|
|
||||||
|
// Create job
|
||||||
|
MultiServerCloseBatchWorker worker =
|
||||||
|
new MultiServerCloseBatchWorker(clients, minAbsoluteRedundancy, closeBatchMessage, POST_MESSAGE_RETRY_NUM, callback);
|
||||||
|
|
||||||
|
// Submit job
|
||||||
|
executorService.submit(worker);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -158,8 +206,15 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readBatch(byte[] signerId, int batchId, ClientCallback<CompleteBatch> callback) {
|
public void readBatch(BatchSpecificationMessage batchSpecificationMessage, ClientCallback<CompleteBatch> callback) {
|
||||||
// TODO: Implement
|
|
||||||
|
// Create job
|
||||||
|
MultiServerReadBatchWorker worker =
|
||||||
|
new MultiServerReadBatchWorker(clients, minAbsoluteRedundancy, batchSpecificationMessage, READ_MESSAGES_RETRY_NUM, callback);
|
||||||
|
|
||||||
|
// Submit job
|
||||||
|
executorService.submit(worker);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1,7 +0,0 @@
|
||||||
package meerkat.bulletinboard.workers;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
|
||||||
*/
|
|
||||||
public class MultiServerPostBatchWorker {
|
|
||||||
}
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
|
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
||||||
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.BeginBatchMessage;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
||||||
|
*/
|
||||||
|
public class MultiServerBeginBatchWorker extends MultiServerGenericPostWorker<BeginBatchMessage> {
|
||||||
|
|
||||||
|
public MultiServerBeginBatchWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
|
int minServers, BeginBatchMessage payload, int maxRetry,
|
||||||
|
ClientCallback<Boolean> clientCallback) {
|
||||||
|
|
||||||
|
super(clients, minServers, payload, maxRetry, clientCallback);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doPost(SingleServerBulletinBoardClient client, BeginBatchMessage payload) {
|
||||||
|
client.beginBatch(payload, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
|
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
||||||
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.CloseBatchMessage;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
||||||
|
*/
|
||||||
|
public class MultiServerCloseBatchWorker extends MultiServerGenericPostWorker<CloseBatchMessage> {
|
||||||
|
|
||||||
|
public MultiServerCloseBatchWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
|
int minServers, CloseBatchMessage payload, int maxRetry,
|
||||||
|
ClientCallback<Boolean> clientCallback) {
|
||||||
|
|
||||||
|
super(clients, minServers, payload, maxRetry, clientCallback);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doPost(SingleServerBulletinBoardClient client, CloseBatchMessage payload) {
|
||||||
|
client.closeBatch(payload, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -1,10 +1,9 @@
|
||||||
package meerkat.bulletinboard.workers;
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
||||||
import meerkat.bulletinboard.MultiServerWorker;
|
import meerkat.bulletinboard.MultiServerWorker;
|
||||||
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
import meerkat.comm.CommunicationException;
|
import meerkat.comm.CommunicationException;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
|
||||||
|
|
||||||
import javax.ws.rs.client.WebTarget;
|
import javax.ws.rs.client.WebTarget;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
|
@ -14,16 +13,18 @@ import java.util.List;
|
||||||
/**
|
/**
|
||||||
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
||||||
*/
|
*/
|
||||||
public class MultiServerPostMessageWorker extends MultiServerWorker<BulletinBoardMessage, Boolean> {
|
public abstract class MultiServerGenericPostWorker<T> extends MultiServerWorker<T, Boolean> {
|
||||||
|
|
||||||
public MultiServerPostMessageWorker(List<SingleServerBulletinBoardClient> clients,
|
public MultiServerGenericPostWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
int minServers, BulletinBoardMessage payload, int maxRetry,
|
int minServers, T payload, int maxRetry,
|
||||||
ClientCallback<Boolean> clientCallback) {
|
ClientCallback<Boolean> clientCallback) {
|
||||||
|
|
||||||
super(clients, minServers, payload, maxRetry, clientCallback);
|
super(clients, minServers, payload, maxRetry, clientCallback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected abstract void doPost(SingleServerBulletinBoardClient client, T payload);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method carries out the actual communication with the servers via HTTP Post
|
* This method carries out the actual communication with the servers via HTTP Post
|
||||||
* It accesses the servers one by one and tries to post the payload to each in turn
|
* It accesses the servers one by one and tries to post the payload to each in turn
|
||||||
|
@ -48,7 +49,7 @@ public class MultiServerPostMessageWorker extends MultiServerWorker<BulletinBoar
|
||||||
// Send request to Server
|
// Send request to Server
|
||||||
SingleServerBulletinBoardClient client = clientIterator.next();
|
SingleServerBulletinBoardClient client = clientIterator.next();
|
||||||
|
|
||||||
client.postMessage(payload, this);
|
doPost(client, payload);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,9 @@
|
||||||
package meerkat.bulletinboard.workers;
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
||||||
import meerkat.bulletinboard.MultiServerWorker;
|
import meerkat.bulletinboard.MultiServerWorker;
|
||||||
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
import meerkat.comm.CommunicationException;
|
import meerkat.comm.CommunicationException;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
|
||||||
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -13,13 +12,13 @@ import java.util.List;
|
||||||
/**
|
/**
|
||||||
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
||||||
*/
|
*/
|
||||||
public class MultiServerReadMessagesWorker extends MultiServerWorker<MessageFilterList,List<BulletinBoardMessage>>{
|
public abstract class MultiServerGenericReadWorker<IN, OUT> extends MultiServerWorker<IN, OUT>{
|
||||||
|
|
||||||
private Iterator<SingleServerBulletinBoardClient> clientIterator;
|
protected Iterator<SingleServerBulletinBoardClient> clientIterator;
|
||||||
|
|
||||||
public MultiServerReadMessagesWorker(List<SingleServerBulletinBoardClient> clients,
|
public MultiServerGenericReadWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
int minServers, MessageFilterList payload, int maxRetry,
|
int minServers, IN payload, int maxRetry,
|
||||||
ClientCallback<List<BulletinBoardMessage>> clientCallback) {
|
ClientCallback<OUT> clientCallback) {
|
||||||
|
|
||||||
super(clients, true, minServers, payload, maxRetry, clientCallback); // Shuffle clients on creation to balance load
|
super(clients, true, minServers, payload, maxRetry, clientCallback); // Shuffle clients on creation to balance load
|
||||||
|
|
||||||
|
@ -27,6 +26,8 @@ public class MultiServerReadMessagesWorker extends MultiServerWorker<MessageFilt
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected abstract void doRead(IN payload, SingleServerBulletinBoardClient client);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method carries out the actual communication with the servers via HTTP Post
|
* This method carries out the actual communication with the servers via HTTP Post
|
||||||
* It accesses the servers in a random order until one answers it
|
* It accesses the servers in a random order until one answers it
|
||||||
|
@ -44,7 +45,7 @@ public class MultiServerReadMessagesWorker extends MultiServerWorker<MessageFilt
|
||||||
SingleServerBulletinBoardClient client = clientIterator.next();
|
SingleServerBulletinBoardClient client = clientIterator.next();
|
||||||
|
|
||||||
// Retrieve answer
|
// Retrieve answer
|
||||||
client.readMessages(payload,this);
|
doRead(payload, client);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
fail(new CommunicationException("Could not contact any server"));
|
fail(new CommunicationException("Could not contact any server"));
|
||||||
|
@ -53,7 +54,7 @@ public class MultiServerReadMessagesWorker extends MultiServerWorker<MessageFilt
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleCallback(List<BulletinBoardMessage> msg) {
|
public void handleCallback(OUT msg) {
|
||||||
succeed(msg);
|
succeed(msg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package meerkat.bulletinboard.workers;
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
||||||
import meerkat.bulletinboard.MultiServerWorker;
|
import meerkat.bulletinboard.MultiServerWorker;
|
|
@ -0,0 +1,28 @@
|
||||||
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
|
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
||||||
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
|
import meerkat.bulletinboard.BatchDataContainer;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
||||||
|
*/
|
||||||
|
public class MultiServerPostBatchDataWorker extends MultiServerGenericPostWorker<BatchDataContainer> {
|
||||||
|
|
||||||
|
public MultiServerPostBatchDataWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
|
int minServers, BatchDataContainer payload, int maxRetry,
|
||||||
|
ClientCallback<Boolean> clientCallback) {
|
||||||
|
|
||||||
|
super(clients, minServers, payload, maxRetry, clientCallback);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doPost(SingleServerBulletinBoardClient client, BatchDataContainer payload) {
|
||||||
|
client.postBatchData(payload.signerId, payload.batchId, payload.batchDataList, payload.startPosition, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
|
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
||||||
|
import meerkat.bulletinboard.CompleteBatch;
|
||||||
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
||||||
|
*/
|
||||||
|
public class MultiServerPostBatchWorker extends MultiServerGenericPostWorker<CompleteBatch> {
|
||||||
|
|
||||||
|
public MultiServerPostBatchWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
|
int minServers, CompleteBatch payload, int maxRetry,
|
||||||
|
ClientCallback<Boolean> clientCallback) {
|
||||||
|
|
||||||
|
super(clients, minServers, payload, maxRetry, clientCallback);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doPost(SingleServerBulletinBoardClient client, CompleteBatch payload) {
|
||||||
|
client.postBatch(payload, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
|
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
||||||
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
||||||
|
*/
|
||||||
|
public class MultiServerPostMessageWorker extends MultiServerGenericPostWorker<BulletinBoardMessage> {
|
||||||
|
|
||||||
|
public MultiServerPostMessageWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
|
int minServers, BulletinBoardMessage payload, int maxRetry,
|
||||||
|
ClientCallback<Boolean> clientCallback) {
|
||||||
|
|
||||||
|
super(clients, minServers, payload, maxRetry, clientCallback);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doPost(SingleServerBulletinBoardClient client, BulletinBoardMessage payload) {
|
||||||
|
client.postMessage(payload, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,30 @@
|
||||||
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
|
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
||||||
|
import meerkat.bulletinboard.CompleteBatch;
|
||||||
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.BatchSpecificationMessage;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
||||||
|
*/
|
||||||
|
public class MultiServerReadBatchWorker extends MultiServerGenericReadWorker<BatchSpecificationMessage, CompleteBatch> {
|
||||||
|
|
||||||
|
public MultiServerReadBatchWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
|
int minServers, BatchSpecificationMessage payload, int maxRetry,
|
||||||
|
ClientCallback<CompleteBatch> clientCallback) {
|
||||||
|
|
||||||
|
super(clients, minServers, payload, maxRetry, clientCallback);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doRead(BatchSpecificationMessage payload, SingleServerBulletinBoardClient client) {
|
||||||
|
client.readBatch(payload, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
package meerkat.bulletinboard.workers.multiserver;
|
||||||
|
|
||||||
|
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
|
||||||
|
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
||||||
|
*/
|
||||||
|
public class MultiServerReadMessagesWorker extends MultiServerGenericReadWorker<MessageFilterList,List<BulletinBoardMessage>>{
|
||||||
|
|
||||||
|
public MultiServerReadMessagesWorker(List<SingleServerBulletinBoardClient> clients,
|
||||||
|
int minServers, MessageFilterList payload, int maxRetry,
|
||||||
|
ClientCallback<List<BulletinBoardMessage>> clientCallback) {
|
||||||
|
|
||||||
|
super(clients, minServers, payload, maxRetry, clientCallback);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doRead(MessageFilterList payload, SingleServerBulletinBoardClient client) {
|
||||||
|
client.readMessages(payload, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,17 @@
|
||||||
|
package meerkat.bulletinboard.workers.singleserver;
|
||||||
|
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.BeginBatchMessage;
|
||||||
|
|
||||||
|
import static meerkat.bulletinboard.BulletinBoardConstants.BEGIN_BATCH_PATH;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
||||||
|
* Tries to contact server once and perform a post operation
|
||||||
|
*/
|
||||||
|
public class SingleServerBeginBatchWorker extends SingleServerGenericPostWorker<BeginBatchMessage> {
|
||||||
|
|
||||||
|
public SingleServerBeginBatchWorker(String serverAddress, BeginBatchMessage payload, int maxRetry) {
|
||||||
|
super(serverAddress, BEGIN_BATCH_PATH, payload, maxRetry);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,17 @@
|
||||||
|
package meerkat.bulletinboard.workers.singleserver;
|
||||||
|
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.CloseBatchMessage;
|
||||||
|
|
||||||
|
import static meerkat.bulletinboard.BulletinBoardConstants.CLOSE_BATCH_PATH;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
||||||
|
* Tries to contact server once and perform a close batch operation
|
||||||
|
*/
|
||||||
|
public class SingleServerCloseBatchWorker extends SingleServerGenericPostWorker<CloseBatchMessage> {
|
||||||
|
|
||||||
|
public SingleServerCloseBatchWorker(String serverAddress, CloseBatchMessage payload, int maxRetry) {
|
||||||
|
super(serverAddress, CLOSE_BATCH_PATH, payload, maxRetry);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,9 +1,8 @@
|
||||||
package meerkat.bulletinboard.workers;
|
package meerkat.bulletinboard.workers.singleserver;
|
||||||
|
|
||||||
import meerkat.bulletinboard.SingleServerWorker;
|
import meerkat.bulletinboard.SingleServerWorker;
|
||||||
import meerkat.comm.CommunicationException;
|
import meerkat.comm.CommunicationException;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.BoolMsg;
|
import meerkat.protobuf.BulletinBoardAPI.BoolMsg;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage;
|
|
||||||
import meerkat.rest.Constants;
|
import meerkat.rest.Constants;
|
||||||
|
|
||||||
import javax.ws.rs.ProcessingException;
|
import javax.ws.rs.ProcessingException;
|
||||||
|
@ -13,16 +12,18 @@ import javax.ws.rs.client.WebTarget;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
|
|
||||||
import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH;
|
import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH;
|
||||||
import static meerkat.bulletinboard.BulletinBoardConstants.POST_MESSAGE_PATH;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
||||||
* Tries to contact server once and perform a post operation
|
* Tries to contact server once and perform a post operation
|
||||||
*/
|
*/
|
||||||
public class SingleServerPostMessageWorker extends SingleServerWorker<BulletinBoardMessage, Boolean> {
|
public class SingleServerGenericPostWorker<T> extends SingleServerWorker<T, Boolean> {
|
||||||
|
|
||||||
public SingleServerPostMessageWorker(String serverAddress, BulletinBoardMessage payload, int maxRetry) {
|
private String subPath;
|
||||||
|
|
||||||
|
public SingleServerGenericPostWorker(String serverAddress, String subPath, T payload, int maxRetry) {
|
||||||
super(serverAddress, payload, maxRetry);
|
super(serverAddress, payload, maxRetry);
|
||||||
|
this.subPath = subPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -30,13 +31,13 @@ public class SingleServerPostMessageWorker extends SingleServerWorker<BulletinBo
|
||||||
* It accesses the server and tries to post the payload to it
|
* It accesses the server and tries to post the payload to it
|
||||||
* Successful post to a server results
|
* Successful post to a server results
|
||||||
* @return TRUE if the operation is successful
|
* @return TRUE if the operation is successful
|
||||||
* @throws CommunicationException if the operation is unseccessful
|
* @throws CommunicationException if the operation is unsuccessful
|
||||||
*/
|
*/
|
||||||
public Boolean call() throws CommunicationException{
|
public Boolean call() throws CommunicationException{
|
||||||
|
|
||||||
Client client = clientLocal.get();
|
Client client = clientLocal.get();
|
||||||
|
|
||||||
WebTarget webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(POST_MESSAGE_PATH);;
|
WebTarget webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(subPath);;
|
||||||
Response response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(
|
Response response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(
|
||||||
Entity.entity(payload, Constants.MEDIATYPE_PROTOBUF));
|
Entity.entity(payload, Constants.MEDIATYPE_PROTOBUF));
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package meerkat.bulletinboard.workers;
|
package meerkat.bulletinboard.workers.singleserver;
|
||||||
|
|
||||||
import meerkat.bulletinboard.SingleServerWorker;
|
import meerkat.bulletinboard.SingleServerWorker;
|
||||||
import meerkat.comm.CommunicationException;
|
import meerkat.comm.CommunicationException;
|
|
@ -0,0 +1,17 @@
|
||||||
|
package meerkat.bulletinboard.workers.singleserver;
|
||||||
|
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.BatchMessage;
|
||||||
|
|
||||||
|
import static meerkat.bulletinboard.BulletinBoardConstants.POST_BATCH_PATH;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
||||||
|
* Tries to contact server once and perform a post batch operation
|
||||||
|
*/
|
||||||
|
public class SingleServerPostBatchWorker extends SingleServerGenericPostWorker<BatchMessage> {
|
||||||
|
|
||||||
|
public SingleServerPostBatchWorker(String serverAddress, BatchMessage payload, int maxRetry) {
|
||||||
|
super(serverAddress, POST_BATCH_PATH, payload, maxRetry);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,17 @@
|
||||||
|
package meerkat.bulletinboard.workers.singleserver;
|
||||||
|
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage;
|
||||||
|
|
||||||
|
import static meerkat.bulletinboard.BulletinBoardConstants.POST_MESSAGE_PATH;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
||||||
|
* Tries to contact server once and perform a post operation
|
||||||
|
*/
|
||||||
|
public class SingleServerPostMessageWorker extends SingleServerGenericPostWorker<BulletinBoardMessage> {
|
||||||
|
|
||||||
|
public SingleServerPostMessageWorker(String serverAddress, BulletinBoardMessage payload, int maxRetry) {
|
||||||
|
super(serverAddress, POST_MESSAGE_PATH, payload, maxRetry);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,119 @@
|
||||||
|
package meerkat.bulletinboard.workers.singleserver;
|
||||||
|
|
||||||
|
import meerkat.bulletinboard.CompleteBatch;
|
||||||
|
import meerkat.bulletinboard.SingleServerWorker;
|
||||||
|
import meerkat.comm.CommunicationException;
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
|
import meerkat.rest.Constants;
|
||||||
|
|
||||||
|
import javax.ws.rs.ProcessingException;
|
||||||
|
import javax.ws.rs.client.Client;
|
||||||
|
import javax.ws.rs.client.Entity;
|
||||||
|
import javax.ws.rs.client.WebTarget;
|
||||||
|
import javax.ws.rs.core.GenericType;
|
||||||
|
import javax.ws.rs.core.Response;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH;
|
||||||
|
import static meerkat.bulletinboard.BulletinBoardConstants.READ_MESSAGES_PATH;
|
||||||
|
import static meerkat.bulletinboard.BulletinBoardConstants.READ_BATCH_PATH;
|
||||||
|
|
||||||
|
import static meerkat.bulletinboard.BulletinBoardConstants.BATCH_ID_TAG_PREFIX;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
||||||
|
*/
|
||||||
|
public class SingleServerReadBatchWorker extends SingleServerWorker<BatchSpecificationMessage, CompleteBatch> {
|
||||||
|
|
||||||
|
public SingleServerReadBatchWorker(String serverAddress, BatchSpecificationMessage payload, int maxRetry) {
|
||||||
|
super(serverAddress, payload, maxRetry);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method carries out the actual communication with the server via HTTP Post
|
||||||
|
* Upon successful retrieval from the server the method returns the received values
|
||||||
|
* @return the complete batch as read from the server
|
||||||
|
* @throws CommunicationException if the server's response is invalid
|
||||||
|
*/
|
||||||
|
public CompleteBatch call() throws CommunicationException{
|
||||||
|
|
||||||
|
CompleteBatch completeBatch = new CompleteBatch();
|
||||||
|
|
||||||
|
Client client = clientLocal.get();
|
||||||
|
|
||||||
|
WebTarget webTarget;
|
||||||
|
Response response;
|
||||||
|
|
||||||
|
// Set filters for the batch message metadata retrieval
|
||||||
|
|
||||||
|
MessageFilterList messageFilterList = MessageFilterList.newBuilder()
|
||||||
|
.addFilter(MessageFilter.newBuilder()
|
||||||
|
.setType(FilterType.TAG)
|
||||||
|
.setTag(BATCH_ID_TAG_PREFIX + String.valueOf(payload.getBatchId()))
|
||||||
|
.build())
|
||||||
|
.addFilter(MessageFilter.newBuilder()
|
||||||
|
.setType(FilterType.SIGNER_ID)
|
||||||
|
.setId(payload.getSignerId())
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// Send request to Server
|
||||||
|
|
||||||
|
webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(READ_MESSAGES_PATH);
|
||||||
|
response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(
|
||||||
|
Entity.entity(messageFilterList, Constants.MEDIATYPE_PROTOBUF));
|
||||||
|
|
||||||
|
// Retrieve answer
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
// If a BulletinBoardMessageList is returned: the read was successful
|
||||||
|
BulletinBoardMessage metadata = response.readEntity(BulletinBoardMessageList.class).getMessage(0);
|
||||||
|
|
||||||
|
completeBatch.setBeginBatchMessage(BeginBatchMessage.newBuilder()
|
||||||
|
.setSignerId(payload.getSignerId())
|
||||||
|
.setBatchId(payload.getBatchId())
|
||||||
|
.addAllTag(metadata.getMsg().getTagList())
|
||||||
|
.build());
|
||||||
|
|
||||||
|
completeBatch.setSignature(metadata.getSig(0));
|
||||||
|
|
||||||
|
} catch (ProcessingException | IllegalStateException e) {
|
||||||
|
|
||||||
|
// Read failed
|
||||||
|
throw new CommunicationException("Could not contact the server");
|
||||||
|
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
response.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the batch data
|
||||||
|
|
||||||
|
webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(READ_BATCH_PATH);
|
||||||
|
response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(
|
||||||
|
Entity.entity(payload, Constants.MEDIATYPE_PROTOBUF));
|
||||||
|
|
||||||
|
// Retrieve answer
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
// If a List of BatchData is returned: the read was successful
|
||||||
|
|
||||||
|
completeBatch.appendBatchData(response.readEntity(new GenericType<List<BatchData>>(){}));
|
||||||
|
|
||||||
|
} catch (ProcessingException | IllegalStateException e) {
|
||||||
|
|
||||||
|
// Read failed
|
||||||
|
throw new CommunicationException("Could not contact the server");
|
||||||
|
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
response.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
return completeBatch;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package meerkat.bulletinboard.workers;
|
package meerkat.bulletinboard.workers.singleserver;
|
||||||
|
|
||||||
import meerkat.bulletinboard.SingleServerWorker;
|
import meerkat.bulletinboard.SingleServerWorker;
|
||||||
import meerkat.comm.CommunicationException;
|
import meerkat.comm.CommunicationException;
|
|
@ -141,6 +141,10 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Path(READ_BATCH_PATH)
|
||||||
|
@POST
|
||||||
|
@Consumes(MEDIATYPE_PROTOBUF)
|
||||||
|
@Produces(MEDIATYPE_PROTOBUF)
|
||||||
@Override
|
@Override
|
||||||
public List<BatchData> readBatch(BatchSpecificationMessage message) {
|
public List<BatchData> readBatch(BatchSpecificationMessage message) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
package meerkat.bulletinboard;
|
package meerkat.bulletinboard;
|
||||||
|
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
import meerkat.protobuf.Crypto.Signature;
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -37,36 +37,47 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This message informs the server about the existence of a new batch message and supplies it with the tags associated with it
|
* This message informs the server about the existence of a new batch message and supplies it with the tags associated with it
|
||||||
* @param signerId is the canonical form for the ID of the sender of this batch
|
* @param beginBatchMessage contains the data required to begin the batch
|
||||||
* @param batchId is a unique (per signer) ID for this batch
|
* @param callback is a callback function class for handling results of the operation
|
||||||
* @param tagList is a list of tags that belong to the batch message
|
|
||||||
*/
|
*/
|
||||||
public void beginBatch(byte[] signerId, int batchId, List<String> tagList, ClientCallback<Boolean> callback);
|
public void beginBatch(BeginBatchMessage beginBatchMessage, ClientCallback<Boolean> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method posts batch data into an (assumed to be open) batch
|
* This method posts batch data into an (assumed to be open) batch
|
||||||
* It does not close the batch
|
* It does not close the batch
|
||||||
* @param signerId is the canonical form for the ID of the sender of this batch
|
* @param signerId is the canonical form for the ID of the sender of this batch
|
||||||
* @param batchId is a unique (per signer) ID for this batch
|
* @param batchId is a unique (per signer) ID for this batch
|
||||||
* @param batchDataList is the (canonically ordered) list of data comprising the entire batch message (not just the portion to be written)
|
* @param batchDataList is the (canonically ordered) list of data comprising the portion of the batch to be posted
|
||||||
* @param startPosition is the location (in the batch) of the first entry in batchDataList
|
* @param startPosition is the location (in the batch) of the first entry in batchDataList
|
||||||
* (optionally used to continue interrupted post operations)
|
* (optionally used to continue interrupted post operations)
|
||||||
|
* The first position in the batch is position 0
|
||||||
* @param callback is a callback function class for handling results of the operation
|
* @param callback is a callback function class for handling results of the operation
|
||||||
*/
|
*/
|
||||||
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList,
|
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList,
|
||||||
int startPosition, ClientCallback<Boolean> callback);
|
int startPosition, ClientCallback<Boolean> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Overloading of the postBatchData method in which startPosition is set to the default value 0
|
* Overloading of the postBatchData method which starts at the first position in the batch
|
||||||
*/
|
*/
|
||||||
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, ClientCallback<Boolean> callback);
|
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, ClientCallback<Boolean> callback);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Overloading of the postBatchData method which uses ByteString
|
||||||
|
*/
|
||||||
|
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList,
|
||||||
|
int startPosition, ClientCallback<Boolean> callback);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Overloading of the postBatchData method which uses ByteString and starts at the first position in the batch
|
||||||
|
*/
|
||||||
|
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList, ClientCallback<Boolean> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Attempts to close a batch message
|
* Attempts to close a batch message
|
||||||
* @param closeBatchMessage contains the data required to close the batch
|
* @param closeBatchMessage contains the data required to close the batch
|
||||||
* @param callback is a callback function class for handling results of the operation
|
* @param callback is a callback function class for handling results of the operation
|
||||||
*/
|
*/
|
||||||
public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback<?> callback);
|
public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback<Boolean> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check how "safe" a given message is in an asynchronous manner
|
* Check how "safe" a given message is in an asynchronous manner
|
||||||
|
@ -88,11 +99,10 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read a given batch message from the bulletin board
|
* Read a given batch message from the bulletin board
|
||||||
* @param signerId is the ID of the signer (sender) of the batch message
|
* @param batchSpecificationMessage contains the data required to specify a single batch instance
|
||||||
* @param batchId is the unique (per signer) ID of the batch
|
|
||||||
* @param callback is a callback class for handling the result of the operation
|
* @param callback is a callback class for handling the result of the operation
|
||||||
*/
|
*/
|
||||||
public void readBatch(byte[] signerId, int batchId, ClientCallback<CompleteBatch> callback);
|
public void readBatch(BatchSpecificationMessage batchSpecificationMessage, ClientCallback<CompleteBatch> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Subscribes to a notifier that will return any new messages on the server that match the given filters
|
* Subscribes to a notifier that will return any new messages on the server that match the given filters
|
||||||
|
|
|
@ -9,6 +9,7 @@ public interface BulletinBoardConstants {
|
||||||
|
|
||||||
public static final String BULLETIN_BOARD_SERVER_PATH = "/bbserver";
|
public static final String BULLETIN_BOARD_SERVER_PATH = "/bbserver";
|
||||||
public static final String READ_MESSAGES_PATH = "/readmessages";
|
public static final String READ_MESSAGES_PATH = "/readmessages";
|
||||||
|
public static final String READ_BATCH_PATH = "/readbatch";
|
||||||
public static final String POST_MESSAGE_PATH = "/postmessage";
|
public static final String POST_MESSAGE_PATH = "/postmessage";
|
||||||
public static final String BEGIN_BATCH_PATH = "/beginbatch";
|
public static final String BEGIN_BATCH_PATH = "/beginbatch";
|
||||||
public static final String POST_BATCH_PATH = "/postbatch";
|
public static final String POST_BATCH_PATH = "/postbatch";
|
||||||
|
@ -17,6 +18,6 @@ public interface BulletinBoardConstants {
|
||||||
// Other Constants
|
// Other Constants
|
||||||
|
|
||||||
public static final String BATCH_TAG = "@BATCH";
|
public static final String BATCH_TAG = "@BATCH";
|
||||||
public static final String BATCH_ID_TAG_PREFIX = "#";
|
public static final String BATCH_ID_TAG_PREFIX = "BATCHID#";
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,7 @@ package meerkat.bulletinboard;
|
||||||
|
|
||||||
import com.google.protobuf.Message;
|
import com.google.protobuf.Message;
|
||||||
import meerkat.crypto.Digest;
|
import meerkat.crypto.Digest;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.BeginBatchMessage;
|
import meerkat.protobuf.BulletinBoardAPI.MessageID;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.BatchData;
|
import meerkat.protobuf.BulletinBoardAPI.BatchData;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -36,6 +36,11 @@ public class GenericBatchDigest implements BatchDigest{
|
||||||
return digest.digest();
|
return digest.digest();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageID digestAsMessageID() {
|
||||||
|
return digest.digestAsMessageID();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void update(Message msg) {
|
public void update(Message msg) {
|
||||||
digest.update(msg);
|
digest.update(msg);
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package meerkat.crypto;
|
package meerkat.crypto;
|
||||||
|
|
||||||
import com.google.protobuf.Message;
|
import com.google.protobuf.Message;
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.MessageID;
|
||||||
|
|
||||||
import java.security.MessageDigest;
|
import java.security.MessageDigest;
|
||||||
|
|
||||||
|
@ -15,6 +16,12 @@ public interface Digest {
|
||||||
*/
|
*/
|
||||||
public byte[] digest();
|
public byte[] digest();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Completes the hash computation and returns a MessageID Protobuf as output
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public MessageID digestAsMessageID();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Updates the digest using the specified message (in serialized wire form)
|
* Updates the digest using the specified message (in serialized wire form)
|
||||||
*
|
*
|
||||||
|
|
|
@ -3,6 +3,7 @@ package meerkat.crypto.concrete;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import com.google.protobuf.Message;
|
import com.google.protobuf.Message;
|
||||||
import meerkat.crypto.Digest;
|
import meerkat.crypto.Digest;
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.MessageID;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -60,6 +61,11 @@ public class SHA256Digest implements Digest {
|
||||||
return hash.digest();
|
return hash.digest();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageID digestAsMessageID() {
|
||||||
|
return MessageID.newBuilder().setID(ByteString.copyFrom(digest())).build();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void update(Message msg) {
|
public void update(Message msg) {
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue