parent
4c33e923b2
commit
7c60e487cc
|
@ -1,22 +1,13 @@
|
||||||
package meerkat.bulletinboard;
|
package meerkat.bulletinboard;
|
||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import com.google.protobuf.Timestamp;
|
|
||||||
import meerkat.bulletinboard.workers.singleserver.*;
|
import meerkat.bulletinboard.workers.singleserver.*;
|
||||||
import meerkat.comm.CommunicationException;
|
import meerkat.comm.CommunicationException;
|
||||||
import meerkat.comm.MessageInputStream;
|
|
||||||
import meerkat.crypto.Digest;
|
|
||||||
import meerkat.crypto.concrete.SHA256Digest;
|
import meerkat.crypto.concrete.SHA256Digest;
|
||||||
import meerkat.protobuf.BulletinBoardAPI;
|
|
||||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
import meerkat.protobuf.Voting.*;
|
import meerkat.protobuf.Voting.*;
|
||||||
import meerkat.rest.*;
|
import meerkat.rest.*;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.lang.reflect.InvocationTargetException;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import javax.ws.rs.client.Client;
|
import javax.ws.rs.client.Client;
|
||||||
|
@ -68,7 +59,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{
|
||||||
public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException {
|
public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException {
|
||||||
|
|
||||||
WebTarget webTarget;
|
WebTarget webTarget;
|
||||||
Response response;
|
Response response = null;
|
||||||
|
|
||||||
// Post message to all databases
|
// Post message to all databases
|
||||||
try {
|
try {
|
||||||
|
@ -80,10 +71,17 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{
|
||||||
if (response.getStatusInfo() == Response.Status.OK
|
if (response.getStatusInfo() == Response.Status.OK
|
||||||
|| response.getStatusInfo() == Response.Status.CREATED) {
|
|| response.getStatusInfo() == Response.Status.CREATED) {
|
||||||
response.readEntity(BoolMsg.class).getValue();
|
response.readEntity(BoolMsg.class).getValue();
|
||||||
|
} else {
|
||||||
|
throw new CommunicationException("Server returned error. Status was: " + response.getStatus());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) { // Occurs only when server replies with valid status but invalid data
|
} catch (Exception e) { // Occurs only when server replies with valid status but invalid data
|
||||||
throw new CommunicationException("Error accessing database: " + e.getMessage());
|
|
||||||
|
throw new CommunicationException("Server returned invalid data type: " + e.getMessage());
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (response != null)
|
||||||
|
response.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate the correct message ID and return it
|
// Calculate the correct message ID and return it
|
||||||
|
@ -100,7 +98,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{
|
||||||
int batchID = completeBatch.getBeginBatchMessage().getBatchId();
|
int batchID = completeBatch.getBeginBatchMessage().getBatchId();
|
||||||
|
|
||||||
// Post message to all databases
|
// Post message to all databases
|
||||||
try {
|
|
||||||
for (String db : meerkatDBs) {
|
for (String db : meerkatDBs) {
|
||||||
|
|
||||||
SingleServerBeginBatchWorker beginBatchWorker = new SingleServerBeginBatchWorker(db, completeBatch.getBeginBatchMessage(), 0);
|
SingleServerBeginBatchWorker beginBatchWorker = new SingleServerBeginBatchWorker(db, completeBatch.getBeginBatchMessage(), 0);
|
||||||
|
@ -128,9 +126,6 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{
|
||||||
closeBatchWorker.call();
|
closeBatchWorker.call();
|
||||||
|
|
||||||
}
|
}
|
||||||
} catch (Exception e) { // Occurs only when server replies with valid status but invalid data
|
|
||||||
throw new CommunicationException("Error accessing database: " + e.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
digest.update(completeBatch);
|
digest.update(completeBatch);
|
||||||
return digest.digestAsMessageID();
|
return digest.digestAsMessageID();
|
||||||
|
|
|
@ -8,6 +8,8 @@ import meerkat.util.BulletinBoardUtils;
|
||||||
|
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by Arbel on 13/04/2016.
|
* Created by Arbel on 13/04/2016.
|
||||||
|
@ -25,24 +27,75 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
|
||||||
|
|
||||||
private static final MessageFilterList EMPTY_FILTER = MessageFilterList.getDefaultInstance();
|
private static final MessageFilterList EMPTY_FILTER = MessageFilterList.getDefaultInstance();
|
||||||
private static final int SLEEP_INTERVAL = 10000; // 10 Seconds
|
private static final int SLEEP_INTERVAL = 10000; // 10 Seconds
|
||||||
|
private static final int WAIT_CAP = 300000; // 5 minutes wait before deciding that the sync has failed fatally
|
||||||
|
|
||||||
|
private Semaphore semaphore;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is a callback that deletes a message if it has been successfully posted
|
||||||
|
* It also calls a stored callback
|
||||||
|
*/
|
||||||
private class MessageDeleteCallback implements FutureCallback<Boolean> {
|
private class MessageDeleteCallback implements FutureCallback<Boolean> {
|
||||||
|
|
||||||
private final long entryNum;
|
private final long entryNum;
|
||||||
|
private final FutureCallback<Void> callback;
|
||||||
|
|
||||||
public MessageDeleteCallback(long entryNum) {
|
public MessageDeleteCallback(long entryNum, FutureCallback<Void> callback) {
|
||||||
this.entryNum = entryNum;
|
this.entryNum = entryNum;
|
||||||
|
this.callback = callback;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(Boolean result) {
|
public void onSuccess(Boolean result) {
|
||||||
// Success: delete from database
|
// Success: delete from database
|
||||||
localClient.deleteMessage(entryNum, null);
|
localClient.deleteMessage(entryNum, null);
|
||||||
|
callback.onSuccess(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
// Ignore
|
callback.onFailure(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class aggregates the results from all of the post operations
|
||||||
|
* If any post has failed: it changes the sync status to SERVER_ERROR
|
||||||
|
* It also notifies the main sync loop when all uploads are finished
|
||||||
|
*/
|
||||||
|
private class SyncStatusUpdateCallback implements FutureCallback<Void> {
|
||||||
|
|
||||||
|
private int count;
|
||||||
|
private boolean errorEncountered;
|
||||||
|
|
||||||
|
public SyncStatusUpdateCallback(int count) {
|
||||||
|
this.count = count;
|
||||||
|
this.errorEncountered = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleStatusUpdate() {
|
||||||
|
count--;
|
||||||
|
if (count <= 0) {
|
||||||
|
|
||||||
|
if (errorEncountered)
|
||||||
|
updateSyncStatus(SyncStatus.SERVER_ERROR);
|
||||||
|
|
||||||
|
// Upload is done: wake up the synchronizer loop
|
||||||
|
semaphore.release();
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Void result) {
|
||||||
|
handleStatusUpdate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
errorEncountered = true;
|
||||||
|
handleStatusUpdate();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -52,6 +105,20 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(List<BulletinBoardMessage> result) {
|
public void onSuccess(List<BulletinBoardMessage> result) {
|
||||||
|
|
||||||
|
// Notify Message Count callbacks if needed
|
||||||
|
|
||||||
|
if (syncStatus != SyncStatus.SYNCHRONIZED || result.size() > 0) {
|
||||||
|
|
||||||
|
for (FutureCallback<Integer> callback : messageCountCallbacks){
|
||||||
|
callback.onSuccess(result.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle upload and status change
|
||||||
|
|
||||||
|
SyncStatusUpdateCallback syncStatusUpdateCallback = new SyncStatusUpdateCallback(result.size());
|
||||||
|
|
||||||
SyncStatus newStatus = SyncStatus.PENDING;
|
SyncStatus newStatus = SyncStatus.PENDING;
|
||||||
|
|
||||||
if (result.size() == 0) {
|
if (result.size() == 0) {
|
||||||
|
@ -79,20 +146,21 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
|
||||||
|
|
||||||
CompleteBatch completeBatch = localClient.readBatch(batchSpecificationMessage);
|
CompleteBatch completeBatch = localClient.readBatch(batchSpecificationMessage);
|
||||||
|
|
||||||
remoteClient.postBatch(completeBatch);
|
remoteClient.postBatch(completeBatch, new MessageDeleteCallback(message.getEntryNum(), syncStatusUpdateCallback));
|
||||||
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
// This is a regular message: post it
|
// This is a regular message: post it
|
||||||
|
remoteClient.postMessage(message, new MessageDeleteCallback(message.getEntryNum(), syncStatusUpdateCallback));
|
||||||
remoteClient.postMessage(message);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
localClient.deleteMessage(message.getEntryNum());
|
localClient.deleteMessage(message.getEntryNum());
|
||||||
|
|
||||||
} catch (CommunicationException e) {
|
} catch (CommunicationException e) {
|
||||||
|
// This is an error with the local server
|
||||||
|
// TODO: log
|
||||||
updateSyncStatus(SyncStatus.SERVER_ERROR);
|
updateSyncStatus(SyncStatus.SERVER_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,6 +211,8 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
|
||||||
messageCountCallbacks = new LinkedList<>();
|
messageCountCallbacks = new LinkedList<>();
|
||||||
syncStatusCallbacks = new LinkedList<>();
|
syncStatusCallbacks = new LinkedList<>();
|
||||||
|
|
||||||
|
semaphore = new Semaphore(0);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -185,18 +255,32 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
|
||||||
|
|
||||||
while (syncStatus != SyncStatus.STOPPED) {
|
while (syncStatus != SyncStatus.STOPPED) {
|
||||||
|
|
||||||
try {
|
|
||||||
|
|
||||||
do {
|
do {
|
||||||
localClient.readMessages(EMPTY_FILTER, callback);
|
|
||||||
} while (syncStatus == SyncStatus.PENDING);
|
|
||||||
|
|
||||||
synchronized (this) {
|
if (syncStatus == SyncStatus.PENDING || syncStatus == SyncStatus.SERVER_ERROR) {
|
||||||
this.wait(SLEEP_INTERVAL);
|
|
||||||
|
localClient.readMessages(EMPTY_FILTER, callback);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
semaphore.tryAcquire(WAIT_CAP, TimeUnit.MILLISECONDS);
|
||||||
|
//TODO: log hard error. Too much time trying to upload data.
|
||||||
|
|
||||||
|
} catch (InterruptedException ignored) {
|
||||||
|
// We expect an interruption when the upload will complete
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
} while (syncStatus == SyncStatus.PENDING);
|
||||||
|
|
||||||
|
// Database is synced. Wait for new data.
|
||||||
|
|
||||||
|
try {
|
||||||
|
semaphore.tryAcquire(SLEEP_INTERVAL, TimeUnit.MILLISECONDS);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
//TODO: log (probably nudged)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -207,9 +291,7 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void nudge() {
|
public void nudge() {
|
||||||
synchronized (this) {
|
semaphore.release();
|
||||||
this.notify();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -6,7 +6,6 @@ import meerkat.comm.MessageInputStream;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
import meerkat.rest.Constants;
|
import meerkat.rest.Constants;
|
||||||
|
|
||||||
import javax.ws.rs.ProcessingException;
|
|
||||||
import javax.ws.rs.client.Client;
|
import javax.ws.rs.client.Client;
|
||||||
import javax.ws.rs.client.Entity;
|
import javax.ws.rs.client.Entity;
|
||||||
import javax.ws.rs.client.WebTarget;
|
import javax.ws.rs.client.WebTarget;
|
||||||
|
@ -14,7 +13,6 @@ import javax.ws.rs.core.Response;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
|
||||||
|
|
||||||
import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH;
|
import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH;
|
||||||
import static meerkat.bulletinboard.BulletinBoardConstants.READ_MESSAGES_PATH;
|
import static meerkat.bulletinboard.BulletinBoardConstants.READ_MESSAGES_PATH;
|
||||||
|
|
|
@ -0,0 +1,241 @@
|
||||||
|
package meerkat.bulletinboard;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
|
||||||
|
import static meerkat.bulletinboard.BulletinBoardSynchronizer.SyncStatus;
|
||||||
|
|
||||||
|
import meerkat.bulletinboard.sqlserver.BulletinBoardSQLServer;
|
||||||
|
import meerkat.bulletinboard.sqlserver.H2QueryProvider;
|
||||||
|
|
||||||
|
import meerkat.comm.CommunicationException;
|
||||||
|
import meerkat.crypto.concrete.ECDSASignature;
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
|
import meerkat.util.BulletinBoardMessageComparator;
|
||||||
|
import meerkat.util.BulletinBoardMessageGenerator;
|
||||||
|
import org.junit.*;
|
||||||
|
import org.springframework.jdbc.core.RowMapper;
|
||||||
|
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.security.*;
|
||||||
|
import java.security.cert.CertificateException;
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by Arbel on 6/1/2016.
|
||||||
|
*/
|
||||||
|
public class BulletinBoardSynchronizerTest {
|
||||||
|
|
||||||
|
private static final String REMOTE_SERVER_ADDRESS = "remoteDB";
|
||||||
|
private static final String LOCAL_SERVER_ADDRESS = "localDB";
|
||||||
|
|
||||||
|
private static final int THREAD_NUM = 3;
|
||||||
|
private static final int SUBSCRIPTION_INTERVAL = 1000;
|
||||||
|
|
||||||
|
private DeletableSubscriptionBulletinBoardClient localClient;
|
||||||
|
private AsyncBulletinBoardClient remoteClient;
|
||||||
|
|
||||||
|
private BulletinBoardSynchronizer synchronizer;
|
||||||
|
|
||||||
|
private static BulletinBoardMessageGenerator messageGenerator;
|
||||||
|
private static BulletinBoardMessageComparator messageComparator;
|
||||||
|
|
||||||
|
private static String KEYFILE_EXAMPLE = "/certs/enduser-certs/user1-key-with-password-secret.p12";
|
||||||
|
private static String KEYFILE_PASSWORD1 = "secret";
|
||||||
|
private static String CERT1_PEM_EXAMPLE = "/certs/enduser-certs/user1.crt";
|
||||||
|
|
||||||
|
private static GenericBatchDigitalSignature[] signers;
|
||||||
|
private static ByteString[] signerIDs;
|
||||||
|
|
||||||
|
private Semaphore semaphore;
|
||||||
|
private List<Throwable> thrown;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void build() {
|
||||||
|
|
||||||
|
messageGenerator = new BulletinBoardMessageGenerator(new Random(0));
|
||||||
|
messageComparator = new BulletinBoardMessageComparator();
|
||||||
|
|
||||||
|
signers = new GenericBatchDigitalSignature[1];
|
||||||
|
signerIDs = new ByteString[1];
|
||||||
|
|
||||||
|
signers[0] = new GenericBatchDigitalSignature(new ECDSASignature());
|
||||||
|
|
||||||
|
InputStream keyStream = BulletinBoardSynchronizerTest.class.getResourceAsStream(KEYFILE_EXAMPLE);
|
||||||
|
char[] password = KEYFILE_PASSWORD1.toCharArray();
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
KeyStore.Builder keyStoreBuilder = signers[0].getPKCS12KeyStoreBuilder(keyStream, password);
|
||||||
|
|
||||||
|
signers[0].loadSigningCertificate(keyStoreBuilder);
|
||||||
|
|
||||||
|
signers[0].loadVerificationCertificates(BulletinBoardSynchronizerTest.class.getResourceAsStream(CERT1_PEM_EXAMPLE));
|
||||||
|
|
||||||
|
} catch (IOException e) {
|
||||||
|
System.err.println("Failed reading from signature file " + e.getMessage());
|
||||||
|
fail("Failed reading from signature file " + e.getMessage());
|
||||||
|
} catch (CertificateException e) {
|
||||||
|
System.err.println("Failed reading certificate " + e.getMessage());
|
||||||
|
fail("Failed reading certificate " + e.getMessage());
|
||||||
|
} catch (KeyStoreException e) {
|
||||||
|
System.err.println("Failed reading keystore " + e.getMessage());
|
||||||
|
fail("Failed reading keystore " + e.getMessage());
|
||||||
|
} catch (NoSuchAlgorithmException e) {
|
||||||
|
System.err.println("Couldn't find signing algorithm " + e.getMessage());
|
||||||
|
fail("Couldn't find signing algorithm " + e.getMessage());
|
||||||
|
} catch (UnrecoverableKeyException e) {
|
||||||
|
System.err.println("Couldn't find signing key " + e.getMessage());
|
||||||
|
fail("Couldn't find signing key " + e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
signerIDs[0] = signers[0].getSignerID();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void init() throws CommunicationException {
|
||||||
|
|
||||||
|
DeletableBulletinBoardServer remoteServer = new BulletinBoardSQLServer(new H2QueryProvider(REMOTE_SERVER_ADDRESS));
|
||||||
|
remoteServer.init(REMOTE_SERVER_ADDRESS);
|
||||||
|
|
||||||
|
remoteClient = new LocalBulletinBoardClient(
|
||||||
|
remoteServer,
|
||||||
|
THREAD_NUM,
|
||||||
|
SUBSCRIPTION_INTERVAL);
|
||||||
|
|
||||||
|
DeletableBulletinBoardServer localServer = new BulletinBoardSQLServer(new H2QueryProvider(LOCAL_SERVER_ADDRESS));
|
||||||
|
localServer.init(LOCAL_SERVER_ADDRESS);
|
||||||
|
|
||||||
|
localClient = new LocalBulletinBoardClient(
|
||||||
|
localServer,
|
||||||
|
THREAD_NUM,
|
||||||
|
SUBSCRIPTION_INTERVAL);
|
||||||
|
|
||||||
|
synchronizer = new SimpleBulletinBoardSynchronizer();
|
||||||
|
synchronizer.init(localClient, remoteClient);
|
||||||
|
|
||||||
|
semaphore = new Semaphore(0);
|
||||||
|
thrown = new LinkedList<>();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private class SyncStatusCallback implements FutureCallback<SyncStatus> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(SyncStatus result) {
|
||||||
|
|
||||||
|
if (result == SyncStatus.SYNCHRONIZED){
|
||||||
|
semaphore.release();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
thrown.add(t);
|
||||||
|
semaphore.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class MessageCountCallback implements FutureCallback<Integer> {
|
||||||
|
|
||||||
|
private int[] expectedCounts;
|
||||||
|
private int currentIteration;
|
||||||
|
|
||||||
|
public MessageCountCallback(int[] expectedCounts) {
|
||||||
|
this.expectedCounts = expectedCounts;
|
||||||
|
this.currentIteration = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Integer result) {
|
||||||
|
|
||||||
|
if (currentIteration < expectedCounts.length){
|
||||||
|
if (result != expectedCounts[currentIteration]){
|
||||||
|
onFailure(new AssertionError("Wrong message count. Expected " + expectedCounts[currentIteration] + " but received " + result));
|
||||||
|
currentIteration = expectedCounts.length;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
currentIteration++;
|
||||||
|
|
||||||
|
if (currentIteration == expectedCounts.length)
|
||||||
|
semaphore.release();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
thrown.add(t);
|
||||||
|
semaphore.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSync() throws SignatureException, CommunicationException, InterruptedException {
|
||||||
|
|
||||||
|
final int BATCH_ID = 1;
|
||||||
|
|
||||||
|
BulletinBoardMessage msg = messageGenerator.generateRandomMessage(signers, 10, 10);
|
||||||
|
|
||||||
|
MessageID msgID = localClient.postMessage(msg);
|
||||||
|
|
||||||
|
CompleteBatch completeBatch = messageGenerator.generateRandomBatch(signers[0],BATCH_ID,10,10,10);
|
||||||
|
|
||||||
|
localClient.postBatch(completeBatch);
|
||||||
|
|
||||||
|
synchronizer.subscribeToSyncStatus(new SyncStatusCallback());
|
||||||
|
|
||||||
|
int[] expectedCounts = {2,0};
|
||||||
|
synchronizer.subscribeToRemainingMessagesCount(new MessageCountCallback(expectedCounts));
|
||||||
|
|
||||||
|
Thread t = new Thread(synchronizer);
|
||||||
|
t.run();
|
||||||
|
|
||||||
|
semaphore.acquire();
|
||||||
|
|
||||||
|
synchronizer.stop();
|
||||||
|
t.join();
|
||||||
|
|
||||||
|
assertThat("Exception thrown by Synchronizer: " + thrown.get(0).getMessage(), thrown.size() == 0);
|
||||||
|
|
||||||
|
List<BulletinBoardMessage> msgList = remoteClient.readMessages(MessageFilterList.newBuilder()
|
||||||
|
.addFilter(MessageFilter.newBuilder()
|
||||||
|
.setType(FilterType.MSG_ID)
|
||||||
|
.setId(msgID.getID())
|
||||||
|
.build())
|
||||||
|
.build());
|
||||||
|
|
||||||
|
assertThat("Wrong number of messages returned.", msgList.size() == 1);
|
||||||
|
assertThat("Returned message is not equal to original one", messageComparator.compare(msgList.get(0),msg) == 0);
|
||||||
|
|
||||||
|
CompleteBatch returnedBatch = remoteClient.readBatch(BatchSpecificationMessage.newBuilder()
|
||||||
|
.setSignerId(signerIDs[0])
|
||||||
|
.setBatchId(BATCH_ID)
|
||||||
|
.build());
|
||||||
|
|
||||||
|
assertThat("Returned batch does not equal original one.", completeBatch.equals(returnedBatch));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void close() {
|
||||||
|
|
||||||
|
synchronizer.stop();
|
||||||
|
localClient.close();
|
||||||
|
remoteClient.close();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -6,7 +6,6 @@ import java.util.*;
|
||||||
import com.google.protobuf.*;
|
import com.google.protobuf.*;
|
||||||
|
|
||||||
import com.google.protobuf.Timestamp;
|
import com.google.protobuf.Timestamp;
|
||||||
import com.sun.org.apache.xpath.internal.operations.Bool;
|
|
||||||
import meerkat.bulletinboard.*;
|
import meerkat.bulletinboard.*;
|
||||||
import meerkat.bulletinboard.sqlserver.mappers.*;
|
import meerkat.bulletinboard.sqlserver.mappers.*;
|
||||||
import static meerkat.bulletinboard.BulletinBoardConstants.*;
|
import static meerkat.bulletinboard.BulletinBoardConstants.*;
|
||||||
|
@ -17,7 +16,6 @@ import meerkat.comm.MessageOutputStream;
|
||||||
import meerkat.crypto.concrete.ECDSASignature;
|
import meerkat.crypto.concrete.ECDSASignature;
|
||||||
import meerkat.crypto.concrete.SHA256Digest;
|
import meerkat.crypto.concrete.SHA256Digest;
|
||||||
|
|
||||||
import meerkat.protobuf.BulletinBoardAPI;
|
|
||||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
import meerkat.protobuf.Crypto.Signature;
|
import meerkat.protobuf.Crypto.Signature;
|
||||||
import meerkat.protobuf.Crypto.SignatureVerificationKey;
|
import meerkat.protobuf.Crypto.SignatureVerificationKey;
|
||||||
|
@ -29,7 +27,6 @@ import javax.sql.DataSource;
|
||||||
|
|
||||||
import meerkat.util.BulletinBoardUtils;
|
import meerkat.util.BulletinBoardUtils;
|
||||||
import meerkat.util.TimestampComparator;
|
import meerkat.util.TimestampComparator;
|
||||||
import org.springframework.jdbc.core.RowMapper;
|
|
||||||
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
|
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
|
||||||
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
|
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
|
||||||
import org.springframework.jdbc.support.GeneratedKeyHolder;
|
import org.springframework.jdbc.support.GeneratedKeyHolder;
|
||||||
|
|
|
@ -247,12 +247,12 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider
|
||||||
list.add("CREATE TABLE IF NOT EXISTS TagTable (TagId INT NOT NULL AUTO_INCREMENT PRIMARY KEY, Tag VARCHAR(50) UNIQUE)");
|
list.add("CREATE TABLE IF NOT EXISTS TagTable (TagId INT NOT NULL AUTO_INCREMENT PRIMARY KEY, Tag VARCHAR(50) UNIQUE)");
|
||||||
|
|
||||||
list.add("CREATE TABLE IF NOT EXISTS MsgTagTable (EntryNum INT, TagId INT,"
|
list.add("CREATE TABLE IF NOT EXISTS MsgTagTable (EntryNum INT, TagId INT,"
|
||||||
+ " FOREIGN KEY (EntryNum) REFERENCES MsgTable(EntryNum),"
|
+ " FOREIGN KEY (EntryNum) REFERENCES MsgTable(EntryNum) ON DELETE CASCADE,"
|
||||||
+ " FOREIGN KEY (TagId) REFERENCES TagTable(TagId),"
|
+ " FOREIGN KEY (TagId) REFERENCES TagTable(TagId) ON DELETE CASCADE,"
|
||||||
+ " UNIQUE (EntryNum, TagID))");
|
+ " UNIQUE (EntryNum, TagID))");
|
||||||
|
|
||||||
list.add("CREATE TABLE IF NOT EXISTS SignatureTable (EntryNum INT, SignerId TINYBLOB, Signature TINYBLOB UNIQUE,"
|
list.add("CREATE TABLE IF NOT EXISTS SignatureTable (EntryNum INT, SignerId TINYBLOB, Signature TINYBLOB UNIQUE,"
|
||||||
+ " FOREIGN KEY (EntryNum) REFERENCES MsgTable(EntryNum))");
|
+ " FOREIGN KEY (EntryNum) REFERENCES MsgTable(EntryNum) ON DELETE CASCADE)");
|
||||||
|
|
||||||
list.add("CREATE INDEX IF NOT EXISTS SignerIndex ON SignatureTable(SignerId)");
|
list.add("CREATE INDEX IF NOT EXISTS SignerIndex ON SignatureTable(SignerId)");
|
||||||
list.add("CREATE UNIQUE INDEX IF NOT EXISTS SignerIndex ON SignatureTable(SignerId, EntryNum)");
|
list.add("CREATE UNIQUE INDEX IF NOT EXISTS SignerIndex ON SignatureTable(SignerId, EntryNum)");
|
||||||
|
@ -261,7 +261,7 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider
|
||||||
+ " UNIQUE(SignerId, BatchId, SerialNum))");
|
+ " UNIQUE(SignerId, BatchId, SerialNum))");
|
||||||
|
|
||||||
list.add("CREATE TABLE IF NOT EXISTS BatchTagTable (SignerId TINYBLOB, BatchId INT, TagId INT,"
|
list.add("CREATE TABLE IF NOT EXISTS BatchTagTable (SignerId TINYBLOB, BatchId INT, TagId INT,"
|
||||||
+ " FOREIGN KEY (TagId) REFERENCES TagTable(TagId))");
|
+ " FOREIGN KEY (TagId) REFERENCES TagTable(TagId) ON DELETE CASCADE)");
|
||||||
|
|
||||||
list.add("CREATE INDEX IF NOT EXISTS BatchIndex ON BatchTagTable(SignerId, BatchId)");
|
list.add("CREATE INDEX IF NOT EXISTS BatchIndex ON BatchTagTable(SignerId, BatchId)");
|
||||||
|
|
||||||
|
|
|
@ -16,8 +16,8 @@ public interface BulletinBoardSynchronizer extends Runnable {
|
||||||
|
|
||||||
public enum SyncStatus{
|
public enum SyncStatus{
|
||||||
SYNCHRONIZED, // No more messages to upload
|
SYNCHRONIZED, // No more messages to upload
|
||||||
PENDING, // Synchronizer is uploading data
|
PENDING, // Synchronizer is querying for data to upload and uploading it as needed
|
||||||
SERVER_ERROR, // Synchronizer encountered an error while uploading
|
SERVER_ERROR, // Synchronizer encountered an error while uploading, but will retry
|
||||||
STOPPED // Stopped/Not started by user
|
STOPPED // Stopped/Not started by user
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,13 +1,14 @@
|
||||||
package meerkat.util;
|
package meerkat.util;
|
||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
|
import meerkat.bulletinboard.BatchDigitalSignature;
|
||||||
|
import meerkat.bulletinboard.CompleteBatch;
|
||||||
import meerkat.crypto.DigitalSignature;
|
import meerkat.crypto.DigitalSignature;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
import com.google.protobuf.Timestamp;
|
import com.google.protobuf.Timestamp;
|
||||||
|
|
||||||
import java.math.BigInteger;
|
import java.math.BigInteger;
|
||||||
import java.security.SignatureException;
|
import java.security.SignatureException;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
@ -28,10 +29,33 @@ public class BulletinBoardMessageGenerator {
|
||||||
return (byte) random.nextInt();
|
return (byte) random.nextInt();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private byte[] randomBytes(int length) {
|
||||||
|
|
||||||
|
byte[] result = new byte[length];
|
||||||
|
|
||||||
|
for (int i = 0; i < length; i++) {
|
||||||
|
result[i] = randomByte();
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
private String randomString(){
|
private String randomString(){
|
||||||
return new BigInteger(130, random).toString(32);
|
return new BigInteger(130, random).toString(32);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<String> randomStrings(int length) {
|
||||||
|
|
||||||
|
List<String> result = new LinkedList<>();
|
||||||
|
|
||||||
|
for (int i = 0; i < length; i++) {
|
||||||
|
result.add(randomString());
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generates a complete instance of a BulletinBoardMessage
|
* Generates a complete instance of a BulletinBoardMessage
|
||||||
* @param signers contains the (possibly multiple) credentials required to sign the message
|
* @param signers contains the (possibly multiple) credentials required to sign the message
|
||||||
|
@ -46,23 +70,16 @@ public class BulletinBoardMessageGenerator {
|
||||||
|
|
||||||
// Generate random data.
|
// Generate random data.
|
||||||
|
|
||||||
byte[] data = new byte[dataSize];
|
|
||||||
String[] newTags = new String[tagNumber];
|
|
||||||
|
|
||||||
for (int i = 0; i < dataSize; i++) {
|
|
||||||
data[i] = randomByte();
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < tagNumber; i++) {
|
|
||||||
newTags[i] = randomString();
|
|
||||||
}
|
|
||||||
|
|
||||||
UnsignedBulletinBoardMessage unsignedMessage =
|
UnsignedBulletinBoardMessage unsignedMessage =
|
||||||
UnsignedBulletinBoardMessage.newBuilder()
|
UnsignedBulletinBoardMessage.newBuilder()
|
||||||
.setData(ByteString.copyFrom(data))
|
.setData(ByteString.copyFrom(randomBytes(dataSize)))
|
||||||
.setTimestamp(timestamp)
|
.setTimestamp(timestamp)
|
||||||
.addAllTag(tags)
|
.addAllTag(tags)
|
||||||
.addAllTag(Arrays.asList(newTags))
|
.addAllTag(randomStrings(tagNumber))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
BulletinBoardMessage.Builder messageBuilder =
|
BulletinBoardMessage.Builder messageBuilder =
|
||||||
|
@ -102,7 +119,6 @@ public class BulletinBoardMessageGenerator {
|
||||||
* @param tagNumber is the number of tags to generate
|
* @param tagNumber is the number of tags to generate
|
||||||
* @return a random, signed Bulletin Board Message containing random data, tags and timestamp
|
* @return a random, signed Bulletin Board Message containing random data, tags and timestamp
|
||||||
*/
|
*/
|
||||||
|
|
||||||
public BulletinBoardMessage generateRandomMessage(DigitalSignature[] signers, int dataSize, int tagNumber)
|
public BulletinBoardMessage generateRandomMessage(DigitalSignature[] signers, int dataSize, int tagNumber)
|
||||||
throws SignatureException {
|
throws SignatureException {
|
||||||
|
|
||||||
|
@ -115,4 +131,97 @@ public class BulletinBoardMessageGenerator {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates a complete instance of a CompleteBatch
|
||||||
|
* @param signer contains the credentials required to sign the message
|
||||||
|
* @param batchId is the (per-signer) batch-ID
|
||||||
|
* @param timestamp is the time at which the message was generated
|
||||||
|
* @param dataCount is the number of Batch Data in the batch
|
||||||
|
* @param dataSize is the number of bytes per Batch Data
|
||||||
|
* @param tagCount is the number of tags
|
||||||
|
* @param tags contains a list of tags to be added (in addition to the random ones)
|
||||||
|
* @return a random, signed CompleteBatch containing random data and tags
|
||||||
|
* @throws SignatureException if an error occurs while signing the batch
|
||||||
|
*/
|
||||||
|
public CompleteBatch generateRandomBatch(BatchDigitalSignature signer, int batchId, Timestamp timestamp, int dataCount, int dataSize, int tagCount, List<String> tags) throws SignatureException {
|
||||||
|
|
||||||
|
CompleteBatch result = new CompleteBatch(BeginBatchMessage.newBuilder()
|
||||||
|
.setSignerId(signer.getSignerID())
|
||||||
|
.setBatchId(batchId)
|
||||||
|
.addAllTag(tags)
|
||||||
|
.addAllTag(randomStrings(tagCount))
|
||||||
|
.build());
|
||||||
|
|
||||||
|
List<BatchData> batchDataList = new LinkedList<>();
|
||||||
|
|
||||||
|
for (int i = 0 ; i < dataCount ; i++) {
|
||||||
|
batchDataList.add(BatchData.newBuilder()
|
||||||
|
.setData(ByteString.copyFrom(randomBytes(dataSize)))
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
result.appendBatchData(batchDataList);
|
||||||
|
|
||||||
|
result.setTimestamp(timestamp);
|
||||||
|
|
||||||
|
signer.updateContent(result);
|
||||||
|
|
||||||
|
result.setSignature(signer.sign());
|
||||||
|
|
||||||
|
return result;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates a complete instance of a CompleteBatch
|
||||||
|
* @param signer contains the credentials required to sign the message
|
||||||
|
* @param batchId is the (per-signer) batch-ID
|
||||||
|
* @param timestamp is the time at which the message was generated
|
||||||
|
* @param dataCount is the number of Batch Data in the batch
|
||||||
|
* @param dataSize is the number of bytes per Batch Data
|
||||||
|
* @param tagCount is the number of tags
|
||||||
|
* @return a random, signed CompleteBatch containing random data and tags
|
||||||
|
* @throws SignatureException if an error occurs while signing the batch
|
||||||
|
*/
|
||||||
|
public CompleteBatch generateRandomBatch(BatchDigitalSignature signer, int batchId, Timestamp timestamp, int dataCount, int dataSize, int tagCount) throws SignatureException {
|
||||||
|
return generateRandomBatch(signer, batchId, timestamp, dataCount, dataSize, tagCount, new LinkedList<String>());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates a complete instance of a CompleteBatch
|
||||||
|
* @param signer contains the credentials required to sign the message
|
||||||
|
* @param batchId is the (per-signer) batch-ID
|
||||||
|
* @param dataCount is the number of Batch Data in the batch
|
||||||
|
* @param dataSize is the number of bytes per Batch Data
|
||||||
|
* @param tagCount is the number of tags
|
||||||
|
* @param tags contains a list of tags to be added (in addition to the random ones)
|
||||||
|
* @return a random, signed CompleteBatch containing random data, tags and timestamp
|
||||||
|
* @throws SignatureException if an error occurs while signing the batch
|
||||||
|
*/
|
||||||
|
public CompleteBatch generateRandomBatch(BatchDigitalSignature signer, int batchId, int dataCount, int dataSize, int tagCount, List<String> tags) throws SignatureException {
|
||||||
|
|
||||||
|
Timestamp timestamp = Timestamp.newBuilder()
|
||||||
|
.setSeconds(random.nextLong())
|
||||||
|
.setNanos(random.nextInt())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
return generateRandomBatch(signer, batchId, timestamp, dataCount, dataSize, tagCount, tags);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates a complete instance of a CompleteBatch
|
||||||
|
* @param signer contains the credentials required to sign the message
|
||||||
|
* @param batchId is the (per-signer) batch-ID
|
||||||
|
* @param dataCount is the number of Batch Data in the batch
|
||||||
|
* @param dataSize is the number of bytes per Batch Data
|
||||||
|
* @param tagCount is the number of tags
|
||||||
|
* @return a random, signed CompleteBatch containing random data, tags and timestamp
|
||||||
|
* @throws SignatureException if an error occurs while signing the batch
|
||||||
|
*/
|
||||||
|
public CompleteBatch generateRandomBatch(BatchDigitalSignature signer, int batchId, int dataCount, int dataSize, int tagCount) throws SignatureException {
|
||||||
|
return generateRandomBatch(signer, batchId, dataCount, dataSize, tagCount, new LinkedList<String>());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue