Working client-side Batch changes

Cached-Client
Arbel Deutsch Peled 2016-06-26 13:06:16 +03:00
parent 1951db546d
commit d1f7413cde
16 changed files with 292 additions and 403 deletions

View File

@ -3,9 +3,7 @@ package meerkat.bulletinboard;
import com.google.common.util.concurrent.FutureCallback;
import com.google.protobuf.Timestamp;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Comm;
import meerkat.protobuf.Crypto.Signature;
import meerkat.protobuf.Voting.*;
@ -267,9 +265,9 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
}
@Override
public void readBatch(final MessageID msgID, final FutureCallback<BulletinBoardMessage> callback) {
public void readMessage(final MessageID msgID, final FutureCallback<BulletinBoardMessage> callback) {
localClient.readBatch(msgID, new FutureCallback<BulletinBoardMessage>() {
localClient.readMessage(msgID, new FutureCallback<BulletinBoardMessage>() {
@Override
public void onSuccess(BulletinBoardMessage result) {
@ -282,7 +280,7 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
// Read from local unsuccessful: try to read from remote
remoteClient.readBatch(msgID, new FutureCallback<BulletinBoardMessage>() {
remoteClient.readMessage(msgID, new FutureCallback<BulletinBoardMessage>() {
@Override
public void onSuccess(BulletinBoardMessage result) {
@ -398,17 +396,17 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
}
@Override
public BulletinBoardMessage readBatch(MessageID msgID) throws CommunicationException {
public BulletinBoardMessage readMessage(MessageID msgID) throws CommunicationException {
BulletinBoardMessage result = null;
try {
result = localClient.readBatch(msgID);
result = localClient.readMessage(msgID);
} catch (CommunicationException e) {
//TODO: log
}
if (result == null){
result = remoteClient.readBatch(msgID);
result = remoteClient.readMessage(msgID);
if (result != null){
localClient.postMessage(result);

View File

@ -183,6 +183,8 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
}
batchId.setLength(i);
return true;
}
@ -244,6 +246,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
CloseBatchMessage closeBatchMessage = CloseBatchMessage.newBuilder()
.setBatchId(identifier.getBatchId().getValue())
.setBatchLength(identifier.getLength())
.setTimestamp(timestamp)
.addAllSig(signatures)
.build();
@ -438,7 +441,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
@Override
public BulletinBoardMessage call() throws Exception {
// Read message stub
// Read message (mat be a stub)
MessageFilterList filterList = MessageFilterList.newBuilder()
.addFilter(MessageFilter.newBuilder()
@ -451,19 +454,25 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
List<BulletinBoardMessage> bulletinBoardMessages = messageReader.call();
if (bulletinBoardMessages.size() <= 0) {
throw new NotFoundException("Batch does not exist");
throw new NotFoundException("Message does not exist");
}
BulletinBoardMessage stub = bulletinBoardMessages.get(0);
BulletinBoardMessage msg = bulletinBoardMessages.get(0);
// Read data
if (msg.getMsg().getDataTypeCase() == UnsignedBulletinBoardMessage.DataTypeCase.MSGID) {
BatchDataReader batchDataReader = new BatchDataReader(msgID);
List<BatchChunk> batchChunkList = batchDataReader.call();
// Read data
// Combine and return
BatchDataReader batchDataReader = new BatchDataReader(msgID);
List<BatchChunk> batchChunkList = batchDataReader.call();
return BulletinBoardUtils.gatherBatch(stub, batchChunkList);
// Combine and return
return BulletinBoardUtils.gatherBatch(msg, batchChunkList);
} else {
return msg;
}
}
@ -493,7 +502,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
}
@Override
public void readBatch(MessageID msgID, FutureCallback<BulletinBoardMessage> callback) {
public void readMessage(MessageID msgID, FutureCallback<BulletinBoardMessage> callback) {
Futures.addCallback(executorService.submit(new CompleteBatchReader(msgID)), callback);
}
@ -591,7 +600,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
}
@Override
public BulletinBoardMessage readBatch(MessageID msgID) throws CommunicationException {
public BulletinBoardMessage readMessage(MessageID msgID) throws CommunicationException {
MessageFilterList filterList = MessageFilterList.newBuilder()
.addFilter(MessageFilter.newBuilder()
@ -617,20 +626,14 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
throw new IllegalArgumentException("Message is not a stub and does not contain the required message ID");
}
MessageID msgID = MessageID.newBuilder().setID(stub.getMsg().getMsgId()).build();
BatchDataReader batchDataReader = new BatchDataReader(msgID);
List<BatchChunk> batchChunkList = null;
BatchDataCombiner combiner = new BatchDataCombiner(stub);
try {
batchChunkList = batchDataReader.call();
return combiner.call();
} catch (Exception e) {
throw new CommunicationException(e.getCause() + " " + e.getMessage());
}
return BulletinBoardUtils.gatherBatch(stub, batchChunkList);
}
@Override

View File

@ -224,7 +224,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{
}
@Override
public BulletinBoardMessage readBatch(MessageID msgID) throws CommunicationException {
public BulletinBoardMessage readMessage(MessageID msgID) throws CommunicationException {
MessageFilterList filterList = MessageFilterList.newBuilder()
.addFilter(MessageFilter.newBuilder()

View File

@ -36,75 +36,6 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
private Semaphore semaphore;
/**
* This class is a callback that deletes a message if it has been successfully posted
* It also calls a stored callback
*/
private class MessageDeleteCallback implements FutureCallback<Boolean> {
private final long entryNum;
private final FutureCallback<Void> callback;
public MessageDeleteCallback(long entryNum, FutureCallback<Void> callback) {
this.entryNum = entryNum;
this.callback = callback;
}
@Override
public void onSuccess(Boolean result) {
// Success: delete from database
localClient.deleteMessage(entryNum, null);
callback.onSuccess(null);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
}
/**
* This class aggregates the results from all of the post operations
* If any post has failed: it changes the sync status to SERVER_ERROR
* It also notifies the main sync loop when all uploads are finished
*/
private class SyncStatusUpdateCallback implements FutureCallback<Void> {
private int count;
private boolean errorEncountered;
public SyncStatusUpdateCallback(int count) {
this.count = count;
this.errorEncountered = false;
}
private void handleStatusUpdate() {
count--;
if (count <= 0) {
if (errorEncountered)
updateSyncStatus(SyncStatus.SERVER_ERROR);
// Upload is done: wake up the synchronizer loop
semaphore.release();
}
}
@Override
public void onSuccess(Void result) {
handleStatusUpdate();
}
@Override
public void onFailure(Throwable t) {
errorEncountered = true;
handleStatusUpdate();
}
}
private class SyncCallback implements FutureCallback<List<BulletinBoardMessage>> {
@Override
@ -122,8 +53,6 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
// Handle upload and status change
SyncStatusUpdateCallback syncStatusUpdateCallback = new SyncStatusUpdateCallback(result.size());
SyncStatus newStatus = SyncStatus.PENDING;
if (result.size() == 0) {
@ -143,13 +72,17 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
BulletinBoardMessage completeMsg = localClient.readBatchData(message);
remoteClient.postMessage(completeMsg, new MessageDeleteCallback(message.getEntryNum(), syncStatusUpdateCallback));
remoteClient.postMessage(completeMsg);
localClient.deleteMessage(completeMsg.getEntryNum());
} else {
// This is a regular message: post it
remoteClient.postMessage(message, new MessageDeleteCallback(message.getEntryNum(), syncStatusUpdateCallback));
remoteClient.postMessage(message);
localClient.deleteMessage(message.getEntryNum());
}

View File

@ -7,6 +7,8 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Timestamp;
import meerkat.bulletinboard.workers.singleserver.*;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Crypto;
import meerkat.protobuf.Voting.BulletinBoardClientParams;
@ -166,91 +168,74 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
}
}
private class ReadBatchCallback implements FutureCallback<List<BatchChunk>> {
private final BulletinBoardMessage stub;
private final FutureCallback<BulletinBoardMessage> callback;
public ReadBatchCallback(BulletinBoardMessage stub, FutureCallback<BulletinBoardMessage> callback) {
this.stub = stub;
this.callback = callback;
}
@Override
public void onSuccess(List<BatchChunk> result) {
callback.onSuccess(BulletinBoardUtils.gatherBatch(stub, result));
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
}
/**
* This callback ties together the different parts of a CompleteBatch as they arrive from the server
* It assembles a CompleteBatch from the parts and sends it to the user if all parts arrived
* If any part fails to arrive: it invokes the onFailure method
* This callback receives a message which may be a stub
* If the message is not a stub: it returns it as is to a callback function
* If it is a stub: it schedules a read of the batch data which will return a complete message to the callback function
*/
class CompleteBatchReadCallback {
class CompleteMessageReadCallback implements FutureCallback<List<BulletinBoardMessage>>{
private final FutureCallback<BulletinBoardMessage> callback;
private List<BatchChunk> batchChunkList;
private BulletinBoardMessage stub;
private AtomicInteger remainingQueries;
private AtomicBoolean failed;
public CompleteBatchReadCallback(FutureCallback<BulletinBoardMessage> callback) {
public CompleteMessageReadCallback(FutureCallback<BulletinBoardMessage> callback) {
this.callback = callback;
remainingQueries = new AtomicInteger(2);
failed = new AtomicBoolean(false);
}
protected void combineAndReturn() {
@Override
public void onSuccess(List<BulletinBoardMessage> result) {
if (result.size() <= 0) {
onFailure(new CommunicationException("Could not find required message on the server."));
} else {
if (remainingQueries.decrementAndGet() == 0){
BulletinBoardMessage msg = result.get(0);
if (callback != null)
callback.onSuccess(BulletinBoardUtils.gatherBatch(stub, batchChunkList));
}
if (msg.getMsg().getDataTypeCase() != UnsignedBulletinBoardMessage.DataTypeCase.MSGID) {
callback.onSuccess(msg);
} else {
}
// Create job with MAX retries for retrieval of the Batch Data List
protected void fail(Throwable t) {
if (failed.compareAndSet(false, true)) {
if (callback != null)
callback.onFailure(t);
BatchQuery batchQuery = BatchQuery.newBuilder()
.setMsgID(MessageID.newBuilder()
.setID(msg.getMsg().getMsgId())
.build())
.build();
SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(meerkatDBs.get(0), batchQuery, MAX_RETRIES);
scheduleWorker(batchWorker, new ReadBatchCallback(msg, callback));
}
}
}
/**
* @return a FutureCallback for the Batch Data List that ties to this object
*/
public FutureCallback<List<BatchChunk>> asBatchDataListFutureCallback() {
return new FutureCallback<List<BatchChunk>>() {
@Override
public void onSuccess(List<BatchChunk> result) {
batchChunkList = result;
combineAndReturn();
}
@Override
public void onFailure(Throwable t) {
fail(t);
}
};
}
/**
* @return a FutureCallback for the Bulletin Board Message that ties to this object
*/
public FutureCallback<List<BulletinBoardMessage>> asBulletinBoardMessageListFutureCallback() {
return new FutureCallback<List<BulletinBoardMessage>>() {
@Override
public void onSuccess(List<BulletinBoardMessage> result) {
if (result.size() < 1){
onFailure(new IllegalArgumentException("Server returned empty message list"));
return;
}
stub = result.get(0);
combineAndReturn();
}
@Override
public void onFailure(Throwable t) {
fail(t);
}
};
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
}
@ -574,9 +559,9 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
}
@Override
public void readBatch(MessageID msgID, FutureCallback<BulletinBoardMessage> callback) {
public void readMessage(MessageID msgID, FutureCallback<BulletinBoardMessage> callback) {
// Create job with MAX retries for retrieval of the Bulletin Board Message that defines the batch
// Create job with MAX retries for retrieval of the Bulletin Board Message (which may be a stub)
MessageFilterList filterList = MessageFilterList.newBuilder()
.addFilter(MessageFilter.newBuilder()
@ -592,39 +577,11 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
SingleServerReadMessagesWorker messageWorker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterList, MAX_RETRIES);
// Create job with MAX retries for retrieval of the Batch Data List
SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(meerkatDBs.get(0), batchQuery, MAX_RETRIES);
// Create callback that will combine the two worker products
CompleteBatchReadCallback completeBatchReadCallback = new CompleteBatchReadCallback(callback);
// Submit jobs with wrapped callbacks
scheduleWorker(messageWorker, new RetryCallback<>(messageWorker, completeBatchReadCallback.asBulletinBoardMessageListFutureCallback()));
scheduleWorker(batchWorker, new RetryCallback<>(batchWorker, completeBatchReadCallback.asBatchDataListFutureCallback()));
scheduleWorker(messageWorker, new RetryCallback<>(messageWorker, new CompleteMessageReadCallback(callback)));
}
private class ReadBatchCallback implements FutureCallback<List<BatchChunk>> {
private final BulletinBoardMessage stub;
private final FutureCallback<BulletinBoardMessage> callback;
public ReadBatchCallback(BulletinBoardMessage stub, FutureCallback<BulletinBoardMessage> callback) {
this.stub = stub;
this.callback = callback;
}
@Override
public void onSuccess(List<BatchChunk> result) {
callback.onSuccess(BulletinBoardUtils.gatherBatch(stub, result));
}
@Override
public void onFailure(Throwable t) {
}
}
@Override
public void readBatchData(BulletinBoardMessage stub, FutureCallback<BulletinBoardMessage> callback) throws IllegalArgumentException{

View File

@ -4,7 +4,6 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.protobuf.Timestamp;
import meerkat.bulletinboard.workers.multiserver.*;
import meerkat.protobuf.BulletinBoardAPI;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Crypto.Signature;
import meerkat.protobuf.Voting.*;
@ -208,11 +207,11 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
}
@Override
public void readBatch(MessageID msgID, FutureCallback<BulletinBoardMessage> callback) {
public void readMessage(MessageID msgID, FutureCallback<BulletinBoardMessage> callback) {
//Create job
MultiServerReadBatchWorker worker =
new MultiServerReadBatchWorker(clients, minAbsoluteRedundancy, msgID, READ_MESSAGES_RETRY_NUM, callback);
MultiServerReadMessageWorker worker =
new MultiServerReadMessageWorker(clients, minAbsoluteRedundancy, msgID, READ_MESSAGES_RETRY_NUM, callback);
// Submit job
executorService.submit(worker);

View File

@ -22,7 +22,7 @@ public class MultiServerReadBatchDataWorker extends MultiServerGenericReadWorker
@Override
protected void doRead(MessageID payload, SingleServerBulletinBoardClient client) {
client.readBatch(payload, this);
client.readMessage(payload, this);
}

View File

@ -11,11 +11,11 @@ import java.util.List;
/**
* Created by Arbel Deutsch Peled on 27-Dec-15.
*/
public class MultiServerReadBatchWorker extends MultiServerGenericReadWorker<MessageID, BulletinBoardMessage> {
public class MultiServerReadMessageWorker extends MultiServerGenericReadWorker<MessageID, BulletinBoardMessage> {
public MultiServerReadBatchWorker(List<SingleServerBulletinBoardClient> clients,
int minServers, MessageID payload, int maxRetry,
FutureCallback<BulletinBoardMessage> futureCallback) {
public MultiServerReadMessageWorker(List<SingleServerBulletinBoardClient> clients,
int minServers, MessageID payload, int maxRetry,
FutureCallback<BulletinBoardMessage> futureCallback) {
super(clients, minServers, payload, maxRetry, futureCallback);
@ -23,7 +23,7 @@ public class MultiServerReadBatchWorker extends MultiServerGenericReadWorker<Mes
@Override
protected void doRead(MessageID payload, SingleServerBulletinBoardClient client) {
client.readBatch(payload, this);
client.readMessage(payload, this);
}

View File

@ -1,7 +1,8 @@
package meerkat.bulletinboard;
import com.google.common.util.concurrent.FutureCallback;
import com.google.protobuf.ByteString;
import com.google.protobuf.*;
import com.google.protobuf.Timestamp;
import static meerkat.bulletinboard.BulletinBoardSynchronizer.SyncStatus;
@ -57,7 +58,7 @@ public class BulletinBoardSynchronizerTest {
private static String KEYFILE_PASSWORD1 = "secret";
private static String CERT1_PEM_EXAMPLE = "/certs/enduser-certs/user1.crt";
private static GenericBatchDigitalSignature[] signers;
private static BulletinBoardSignature[] signers;
private static ByteString[] signerIDs;
private Semaphore semaphore;
@ -69,10 +70,10 @@ public class BulletinBoardSynchronizerTest {
messageGenerator = new BulletinBoardMessageGenerator(new Random(0));
messageComparator = new BulletinBoardMessageComparator();
signers = new GenericBatchDigitalSignature[1];
signers = new BulletinBoardSignature[1];
signerIDs = new ByteString[1];
signers[0] = new GenericBatchDigitalSignature(new ECDSASignature());
signers[0] = new GenericBulletinBoardSignature(new ECDSASignature());
signerIDs[0] = signers[0].getSignerID();
InputStream keyStream = BulletinBoardSynchronizerTest.class.getResourceAsStream(KEYFILE_EXAMPLE);
@ -203,15 +204,34 @@ public class BulletinBoardSynchronizerTest {
@Test
public void testSync() throws SignatureException, CommunicationException, InterruptedException {
final int BATCH_ID = 1;
Timestamp timestamp = Timestamp.newBuilder()
.setSeconds(15252162)
.setNanos(85914)
.build();
BulletinBoardMessage msg = messageGenerator.generateRandomMessage(signers, 10, 10);
BulletinBoardMessage msg = messageGenerator.generateRandomMessage(signers, timestamp, 10, 10);
MessageID msgID = localClient.postMessage(msg);
CompleteBatch completeBatch = messageGenerator.generateRandomBatch(signers[0],BATCH_ID,10,10,10);
timestamp = Timestamp.newBuilder()
.setSeconds(51511653)
.setNanos(3625)
.build();
localClient.postBatch(completeBatch);
BulletinBoardMessage batchMessage = messageGenerator.generateRandomMessage(signers,timestamp, 100, 10);
MessageID batchMsgID = localClient.postAsBatch(batchMessage, 10);
BulletinBoardMessage test = localClient.readMessage(batchMsgID);
BulletinBoardMessage stub = localClient.readMessages(MessageFilterList.newBuilder()
.addFilter(MessageFilter.newBuilder()
.setType(FilterType.MSG_ID)
.setId(batchMsgID.getID())
.build())
.build()).get(0);
BulletinBoardMessage test2 = localClient.readBatchData(stub);
synchronizer.subscribeToSyncStatus(new SyncStatusCallback(SyncStatus.SYNCHRONIZED));
@ -244,19 +264,21 @@ public class BulletinBoardSynchronizerTest {
assertThat("Wrong number of messages returned.", msgList.size() == 1);
assertThat("Returned message is not equal to original one", messageComparator.compare(msgList.get(0),msg) == 0);
CompleteBatch returnedBatch = remoteClient.readBatch(BatchSpecificationMessage.newBuilder()
.setSignerId(signerIDs[0])
.setBatchId(BATCH_ID)
.build());
BulletinBoardMessage returnedBatchMsg = remoteClient.readMessage(batchMsgID);
assertThat("Returned batch does not equal original one.", completeBatch.equals(returnedBatch));
assertThat("Returned batch does not equal original one.", messageComparator.compare(returnedBatchMsg, batchMessage) == 0);
}
@Test
public void testServerError() throws SignatureException, CommunicationException, InterruptedException {
BulletinBoardMessage msg = messageGenerator.generateRandomMessage(signers, 10, 10);
Timestamp timestamp = Timestamp.newBuilder()
.setSeconds(945736256)
.setNanos(276788)
.build();
BulletinBoardMessage msg = messageGenerator.generateRandomMessage(signers, timestamp, 10, 10);
remoteClient.close();
@ -275,17 +297,18 @@ public class BulletinBoardSynchronizerTest {
synchronizer.stop();
thread.join();
if (thrown.size() > 0) {
for (Throwable t : thrown)
System.err.println(t.getMessage());
assertThat("Exception thrown by Synchronizer: " + thrown.get(0).getMessage(), false);
}
}
@After
public void close() {
if (thrown.size() > 0) {
for (Throwable t : thrown) {
System.err.println(t.getMessage());
}
assertThat("Exception thrown by Synchronizer: " + thrown.get(0).getMessage(), false);
}
synchronizer.stop();
localClient.close();
remoteClient.close();

View File

@ -5,9 +5,13 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import meerkat.comm.CommunicationException;
import meerkat.crypto.concrete.ECDSASignature;
import meerkat.crypto.concrete.SHA256Digest;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Crypto;
import meerkat.util.BulletinBoardMessageComparator;
import meerkat.util.BulletinBoardMessageGenerator;
import meerkat.util.BulletinBoardUtils;
import meerkat.bulletinboard.AsyncBulletinBoardClient.BatchIdentifier;
import java.io.IOException;
import java.io.InputStream;
@ -27,7 +31,7 @@ public class GenericBulletinBoardClientTester {
// Signature resources
private GenericBatchDigitalSignature signers[];
private BulletinBoardSignature signers[];
private ByteString[] signerIDs;
private static String KEYFILE_EXAMPLE = "/certs/enduser-certs/user1-key-with-password-secret.p12";
@ -48,13 +52,15 @@ public class GenericBulletinBoardClientTester {
private RedundancyCallback redundancyCallback;
private ReadCallback readCallback;
private ReadBatchCallback readBatchCallback;
// Sync and misc
private Semaphore jobSemaphore;
private Vector<Throwable> thrown;
private Random random;
private BulletinBoardMessageGenerator generator;
private BulletinBoardDigest digest;
// Constructor
@ -62,10 +68,10 @@ public class GenericBulletinBoardClientTester {
this.bulletinBoardClient = bulletinBoardClient;
signers = new GenericBatchDigitalSignature[2];
signers = new GenericBulletinBoardSignature[2];
signerIDs = new ByteString[signers.length];
signers[0] = new GenericBatchDigitalSignature(new ECDSASignature());
signers[1] = new GenericBatchDigitalSignature(new ECDSASignature());
signers[0] = new GenericBulletinBoardSignature(new ECDSASignature());
signers[1] = new GenericBulletinBoardSignature(new ECDSASignature());
InputStream keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE);
char[] password = KEYFILE_PASSWORD1.toCharArray();
@ -107,6 +113,10 @@ public class GenericBulletinBoardClientTester {
fail("Couldn't find signing key " + e.getMessage());
}
this.random = new Random(0);
this.generator = new BulletinBoardMessageGenerator(random);
this.digest = new GenericBulletinBoardDigest(new SHA256Digest());
}
// Callback definitions
@ -137,16 +147,21 @@ public class GenericBulletinBoardClientTester {
@Override
public void onSuccess(Boolean msg) {
System.err.println("Post operation completed");
jobSemaphore.release();
//TODO: Change Assert mechanism to exception one
if (isAssert) {
if (assertValue) {
assertThat("Post operation failed", msg, is(Boolean.TRUE));
if (assertValue && !msg) {
genericHandleFailure(new AssertionError("Post operation failed"));
} else if (!assertValue && msg){
genericHandleFailure(new AssertionError("Post operation succeeded unexpectedly"));
} else {
assertThat("Post operation succeeded unexpectedly", msg, is(Boolean.FALSE));
jobSemaphore.release();
}
} else {
jobSemaphore.release();
}
}
@Override
@ -209,21 +224,24 @@ public class GenericBulletinBoardClientTester {
}
}
private class ReadBatchCallback implements FutureCallback<CompleteBatch> {
private class ReadBatchCallback implements FutureCallback<BulletinBoardMessage>{
private CompleteBatch expectedBatch;
private BulletinBoardMessage expectedMsg;
public ReadBatchCallback(CompleteBatch expectedBatch) {
this.expectedBatch = expectedBatch;
public ReadBatchCallback(BulletinBoardMessage expectedMsg) {
this.expectedMsg = expectedMsg;
}
@Override
public void onSuccess(CompleteBatch batch) {
public void onSuccess(BulletinBoardMessage msg) {
System.err.println(batch);
jobSemaphore.release();
BulletinBoardMessageComparator msgComparator = new BulletinBoardMessageComparator();
assertThat("Batch returned is incorrect", batch, is(equalTo(expectedBatch)));
if (msgComparator.compare(msg, expectedMsg) != 0) {
genericHandleFailure(new AssertionError("Batch read returned different message.\nExpected:" + expectedMsg + "\nRecieved:" + msg + "\n"));
} else {
jobSemaphore.release();
}
}
@ -233,59 +251,6 @@ public class GenericBulletinBoardClientTester {
}
}
// Randomness generators
private byte randomByte(){
return (byte) random.nextInt();
}
private byte[] randomByteArray(int length) {
byte[] randomBytes = new byte[length];
for (int i = 0; i < length ; i++){
randomBytes[i] = randomByte();
}
return randomBytes;
}
private CompleteBatch createRandomBatch(int signer, int batchId, int length) throws SignatureException {
CompleteBatch completeBatch = new CompleteBatch();
// Create data
completeBatch.setBeginBatchMessage(BeginBatchMessage.newBuilder()
.setSignerId(signerIDs[signer])
.setBatchId(batchId)
.addTag("Test")
.build());
for (int i = 0 ; i < length ; i++){
BatchChunk batchChunk = BatchChunk.newBuilder()
.setData(ByteString.copyFrom(randomByteArray(i)))
.build();
completeBatch.appendBatchData(batchChunk);
}
completeBatch.setTimestamp(Timestamp.newBuilder()
.setSeconds(Math.abs(90))
.setNanos(50)
.build());
signers[signer].updateContent(completeBatch);
completeBatch.setSignature(signers[signer].sign());
return completeBatch;
}
// Test methods
/**
@ -310,7 +275,13 @@ public class GenericBulletinBoardClientTester {
public void close() {
if (thrown.size() > 0) {
for (Throwable t : thrown){
System.err.println(t.getMessage());
}
assert false;
}
}
@ -394,59 +365,54 @@ public class GenericBulletinBoardClientTester {
/**
* Tests posting a batch by parts
* Also tests not being able to post to a closed batch
* @throws CommunicationException, SignatureException, InterruptedException
*/
public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException {
final int SIGNER = 1;
final int BATCH_ID = 100;
final int BATCH_LENGTH = 100;
final int CHUNK_SIZE = 10;
final int TAG_NUM = 10;
CompleteBatch completeBatch = createRandomBatch(SIGNER, BATCH_ID, BATCH_LENGTH);
final BulletinBoardMessage msg = generator.generateRandomMessage(signers, BATCH_LENGTH, TAG_NUM);
// Begin batch
bulletinBoardClient.beginBatch(completeBatch.getBeginBatchMessage(), postCallback);
bulletinBoardClient.beginBatch(msg.getMsg().getTagList(), new FutureCallback<BatchIdentifier>() {
@Override
public void onSuccess(final BatchIdentifier identifier) {
bulletinBoardClient.postBatchData(identifier, BulletinBoardUtils.breakToBatch(msg, CHUNK_SIZE), new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
bulletinBoardClient.closeBatch(identifier, msg.getMsg().getTimestamp(), msg.getSigList(), postCallback);
}
@Override
public void onFailure(Throwable t) {
genericHandleFailure(t);
}
});
}
@Override
public void onFailure(Throwable t) {
genericHandleFailure(t);
}
});
jobSemaphore.acquire();
// Post data
digest.reset();
digest.update(msg);
bulletinBoardClient.postBatchData(signerIDs[SIGNER], BATCH_ID, completeBatch.getBatchDataList(), postCallback);
jobSemaphore.acquire();
// Close batch
CloseBatchMessage closeBatchMessage = completeBatch.getCloseBatchMessage();
bulletinBoardClient.closeBatch(closeBatchMessage, postCallback);
jobSemaphore.acquire();
// Attempt to open batch again
bulletinBoardClient.beginBatch(completeBatch.getBeginBatchMessage(), failPostCallback);
// Attempt to add batch data
bulletinBoardClient.postBatchData(signerIDs[SIGNER], BATCH_ID, completeBatch.getBatchDataList(), failPostCallback);
jobSemaphore.acquire(2);
// Read batch data
BatchSpecificationMessage batchSpecificationMessage =
BatchSpecificationMessage.newBuilder()
.setSignerId(signerIDs[SIGNER])
.setBatchId(BATCH_ID)
.setStartPosition(0)
.build();
readBatchCallback = new ReadBatchCallback(completeBatch);
bulletinBoardClient.readBatch(batchSpecificationMessage, readBatchCallback);
bulletinBoardClient.readMessage(digest.digestAsMessageID(), new ReadBatchCallback(msg));
jobSemaphore.acquire();
@ -454,62 +420,56 @@ public class GenericBulletinBoardClientTester {
/**
* Posts a complete batch message
* Checks reading of the message
* Checks reading of the message in two parts
* @throws CommunicationException, SignatureException, InterruptedException
*/
public void testCompleteBatchPost() throws CommunicationException, SignatureException, InterruptedException {
final int SIGNER = 0;
final int BATCH_ID = 101;
final int BATCH_LENGTH = 50;
final int BATCH_LENGTH = 100;
final int CHUNK_SIZE = 99;
final int TAG_NUM = 8;
final BulletinBoardMessage msg = generator.generateRandomMessage(signers, BATCH_LENGTH, TAG_NUM);
// Post batch
CompleteBatch completeBatch = createRandomBatch(SIGNER, BATCH_ID, BATCH_LENGTH);
bulletinBoardClient.postBatch(completeBatch,postCallback);
MessageID msgID = bulletinBoardClient.postAsBatch(msg, CHUNK_SIZE, postCallback);
jobSemaphore.acquire();
// Read batch
BatchSpecificationMessage batchSpecificationMessage =
BatchSpecificationMessage.newBuilder()
.setSignerId(signerIDs[SIGNER])
.setBatchId(BATCH_ID)
.setStartPosition(0)
.build();
MessageFilterList filterList = MessageFilterList.newBuilder()
.addFilter(MessageFilter.newBuilder()
.setType(FilterType.MSG_ID)
.setId(msgID.getID())
.build())
.build();
readBatchCallback = new ReadBatchCallback(completeBatch);
bulletinBoardClient.readMessages(filterList, new FutureCallback<List<BulletinBoardMessage>>() {
bulletinBoardClient.readBatch(batchSpecificationMessage, readBatchCallback);
@Override
public void onSuccess(List<BulletinBoardMessage> msgList) {
jobSemaphore.acquire();
if (msgList.size() != 1) {
}
genericHandleFailure(new AssertionError("Wrong number of stubs returned. Expected: 1; Found: " + msgList.size()));
/**
* Tests that an unopened batch cannot be closed
* @throws CommunicationException, InterruptedException
*/
public void testInvalidBatchClose() throws CommunicationException, InterruptedException {
} else {
final int NON_EXISTENT_BATCH_ID = 999;
BulletinBoardMessage retrievedMsg = msgList.get(0);
bulletinBoardClient.readBatchData(retrievedMsg, new ReadBatchCallback(msg));
CloseBatchMessage closeBatchMessage =
CloseBatchMessage.newBuilder()
.setBatchId(NON_EXISTENT_BATCH_ID)
.setBatchLength(1)
.setSig(Crypto.Signature.getDefaultInstance())
.setTimestamp(Timestamp.newBuilder()
.setSeconds(9)
.setNanos(12)
.build())
.build();
}
// Try to stop the (unopened) batch;
}
bulletinBoardClient.closeBatch(closeBatchMessage, failPostCallback);
@Override
public void onFailure(Throwable t) {
genericHandleFailure(t);
}
});
jobSemaphore.acquire();

View File

@ -23,7 +23,7 @@ import static org.junit.Assert.fail;
*/
public class GenericSubscriptionClientTester {
private GenericBatchDigitalSignature signers[];
private BulletinBoardSignature signers[];
private ByteString[] signerIDs;
private static String KEYFILE_EXAMPLE = "/certs/enduser-certs/user1-key-with-password-secret.p12";
@ -47,10 +47,10 @@ public class GenericSubscriptionClientTester {
this.bulletinBoardClient = bulletinBoardClient;
signers = new GenericBatchDigitalSignature[2];
signers = new BulletinBoardSignature[2];
signerIDs = new ByteString[signers.length];
signers[0] = new GenericBatchDigitalSignature(new ECDSASignature());
signers[1] = new GenericBatchDigitalSignature(new ECDSASignature());
signers[0] = new GenericBulletinBoardSignature(new ECDSASignature());
signers[1] = new GenericBulletinBoardSignature(new ECDSASignature());
InputStream keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE);
char[] password = KEYFILE_PASSWORD1.toCharArray();

View File

@ -100,13 +100,6 @@ public class LocalBulletinBoardClientTest {
}
@Test
public void testInvalidBatchClose() throws CommunicationException, InterruptedException {
clientTest.testInvalidBatchClose();
}
@Test
public void testSubscription() throws SignatureException, CommunicationException {
subscriptionTester.init();

View File

@ -85,11 +85,4 @@ public class ThreadedBulletinBoardClientIntegrationTest {
}
@Test
public void testInvalidBatchClose() throws CommunicationException, InterruptedException {
clientTest.testInvalidBatchClose();
}
}

View File

@ -19,6 +19,7 @@ import meerkat.crypto.DigitalSignature;
import meerkat.crypto.concrete.SHA256Digest;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Comm;
import meerkat.protobuf.Crypto.Signature;
import meerkat.protobuf.Crypto.SignatureVerificationKey;
@ -546,9 +547,17 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{
}
private void checkConnection() throws CommunicationException {
if (jdbcTemplate == null) {
throw new CommunicationException("DB connection not initialized");
}
}
@Override
public BoolValue postMessage(BulletinBoardMessage msg) throws CommunicationException {
checkConnection();
// Perform a post, calculate the message ID and check the signature for authenticity
if (postMessage(msg, null) != -1){
return BoolValue.newBuilder().setValue(true).build(); // Message was posted
@ -561,6 +570,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{
@Override
public BoolValue deleteMessage(MessageID msgID) throws CommunicationException {
checkConnection();
String sql = sqlQueryProvider.getSQLString(QueryType.DELETE_MSG_BY_ID);
Map namedParameters = new HashMap();
@ -577,6 +588,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{
@Override
public BoolValue deleteMessage(long entryNum) throws CommunicationException {
checkConnection();
String sql = sqlQueryProvider.getSQLString(QueryType.DELETE_MSG_BY_ENTRY);
Map namedParameters = new HashMap();
@ -702,6 +715,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{
@Override
public void readMessages(MessageFilterList filterList, MessageOutputStream<BulletinBoardMessage> out) throws CommunicationException {
checkConnection();
BulletinBoardMessageList.Builder resultListBuilder = BulletinBoardMessageList.newBuilder();
// SQL length is roughly 50 characters per filter + 50 for the query itself
@ -725,6 +740,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{
@Override
public Int32Value getMessageCount(MessageFilterList filterList) throws CommunicationException {
checkConnection();
BulletinBoardMessageList.Builder resultListBuilder = BulletinBoardMessageList.newBuilder();
// SQL length is roughly 50 characters per filter + 50 for the query itself
@ -793,6 +810,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{
@Override
public Int64Value beginBatch(BeginBatchMessage message) throws CommunicationException {
checkConnection();
// Store tags
String sql = sqlQueryProvider.getSQLString(QueryType.STORE_BATCH_TAGS);
MapSqlParameterSource namedParameters = new MapSqlParameterSource();
@ -814,6 +833,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{
@Override
public BoolValue postBatchMessage(BatchMessage batchMessage) throws CommunicationException{
checkConnection();
// Make sure batch is open
if (!isBatchOpen(batchMessage.getBatchId())) {
return BoolValue.newBuilder().setValue(false).build();
@ -837,6 +858,7 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{
@Override
public BoolValue closeBatch(CloseBatchMessage message) throws CommunicationException {
checkConnection();
// Check batch size
@ -916,6 +938,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{
@Override
public void readBatch(BatchQuery batchQuery, MessageOutputStream<BatchChunk> out) throws CommunicationException, IllegalArgumentException{
checkConnection();
// Check that batch is closed
if (!isBatchClosed(batchQuery.getMsgID())) {
throw new IllegalArgumentException("No such batch");
@ -950,7 +974,9 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{
}
@Override
public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) {
public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException{
checkConnection();
if (generateSyncQueryParams == null
|| !generateSyncQueryParams.hasFilterList()
@ -1029,6 +1055,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{
@Override
public SyncQueryResponse querySync(SyncQuery syncQuery) throws CommunicationException {
checkConnection();
if (syncQuery == null){
return SyncQueryResponse.newBuilder()
.setLastEntryNum(-1)

View File

@ -87,19 +87,20 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient {
* Read all messages posted matching the given filter in an asynchronous manner
* Note that if messages haven't been "fully posted", this might return a different
* set of messages in different calls. However, messages that are fully posted
* are guaranteed to be included.
* are guaranteed to be included
* Also: batch messages are returned as stubs.
* @param filterList return only messages that match the filters (null means no filtering).
* @param filterList return only messages that match the filters (null means no filtering)
* @param callback is a callback function class for handling results of the operation
*/
public void readMessages(MessageFilterList filterList, FutureCallback<List<BulletinBoardMessage>> callback);
/**
* Read a given batch message from the bulletin board
* @param msgID is the batch message ID to be read
* Read a given message from the bulletin board
* If the message is a batch: returns a complete message containing the batch data as well as the metadata
* @param msgID is the ID of the message to be read
* @param callback is a callback class for handling the result of the operation
*/
public void readBatch(MessageID msgID, FutureCallback<BulletinBoardMessage> callback);
public void readMessage(MessageID msgID, FutureCallback<BulletinBoardMessage> callback);
/**
* Read batch data for a specific stub message

View File

@ -55,12 +55,13 @@ public interface BulletinBoardClient {
MessageID postAsBatch(BulletinBoardMessage msg, int chunkSize) throws CommunicationException;
/**
* Read a given batch message from the bulletin board
* @param msgID is the batch message ID to be read
* @return the complete batch
* Read a given message from the bulletin board
* If the message is a batch: returns a complete message containing the batch data as well as the metadata
* @param msgID is the ID of the message to be read
* @return the complete message
* @throws CommunicationException if operation is unsuccessful
*/
BulletinBoardMessage readBatch(MessageID msgID) throws CommunicationException;
BulletinBoardMessage readMessage(MessageID msgID) throws CommunicationException;
/**
* Read batch data for a specific stub message