From 7c60e487cc129d791e3bdb1c9c8a69f9d7e14507 Mon Sep 17 00:00:00 2001 From: "arbel.peled" Date: Wed, 1 Jun 2016 21:34:17 +0300 Subject: [PATCH] Created a test for the Synchronizer. Not passing yet. --- .../SimpleBulletinBoardClient.java | 57 ++--- .../SimpleBulletinBoardSynchronizer.java | 110 +++++++- .../SingleServerGetRedundancyWorker.java | 2 - .../BulletinBoardSynchronizerTest.java | 241 ++++++++++++++++++ .../sqlserver/BulletinBoardSQLServer.java | 3 - .../sqlserver/H2QueryProvider.java | 8 +- .../BulletinBoardSynchronizer.java | 4 +- .../util/BulletinBoardMessageGenerator.java | 133 +++++++++- 8 files changed, 490 insertions(+), 68 deletions(-) create mode 100644 bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java index 28252ae..579ecf9 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java @@ -1,22 +1,13 @@ package meerkat.bulletinboard; import com.google.protobuf.ByteString; -import com.google.protobuf.Timestamp; import meerkat.bulletinboard.workers.singleserver.*; import meerkat.comm.CommunicationException; -import meerkat.comm.MessageInputStream; -import meerkat.crypto.Digest; import meerkat.crypto.concrete.SHA256Digest; -import meerkat.protobuf.BulletinBoardAPI; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Voting.*; import meerkat.rest.*; -import java.io.IOException; -import java.io.InputStream; -import java.lang.reflect.InvocationTargetException; -import java.util.Collection; -import java.util.Iterator; import java.util.List; import javax.ws.rs.client.Client; @@ -68,7 +59,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{ public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException { WebTarget webTarget; - Response response; + Response response = null; // Post message to all databases try { @@ -80,10 +71,17 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{ if (response.getStatusInfo() == Response.Status.OK || response.getStatusInfo() == Response.Status.CREATED) { response.readEntity(BoolMsg.class).getValue(); + } else { + throw new CommunicationException("Server returned error. Status was: " + response.getStatus()); } } } catch (Exception e) { // Occurs only when server replies with valid status but invalid data - throw new CommunicationException("Error accessing database: " + e.getMessage()); + + throw new CommunicationException("Server returned invalid data type: " + e.getMessage()); + + } finally { + if (response != null) + response.close(); } // Calculate the correct message ID and return it @@ -100,36 +98,33 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{ int batchID = completeBatch.getBeginBatchMessage().getBatchId(); // Post message to all databases - try { - for (String db : meerkatDBs) { - SingleServerBeginBatchWorker beginBatchWorker = new SingleServerBeginBatchWorker(db, completeBatch.getBeginBatchMessage(), 0); + for (String db : meerkatDBs) { - beginBatchWorker.call(); + SingleServerBeginBatchWorker beginBatchWorker = new SingleServerBeginBatchWorker(db, completeBatch.getBeginBatchMessage(), 0); - BatchMessage.Builder builder = BatchMessage.newBuilder().setSignerId(signerID).setBatchId(batchID); + beginBatchWorker.call(); - for (BatchData batchData : completeBatch.getBatchDataList()) { + BatchMessage.Builder builder = BatchMessage.newBuilder().setSignerId(signerID).setBatchId(batchID); - SingleServerPostBatchWorker postBatchWorker = - new SingleServerPostBatchWorker( - db, - builder.setData(batchData).setSerialNum(pos).build(), - 0); + for (BatchData batchData : completeBatch.getBatchDataList()) { - postBatchWorker.call(); + SingleServerPostBatchWorker postBatchWorker = + new SingleServerPostBatchWorker( + db, + builder.setData(batchData).setSerialNum(pos).build(), + 0); - pos++; + postBatchWorker.call(); - } - - SingleServerCloseBatchWorker closeBatchWorker = new SingleServerCloseBatchWorker(db, completeBatch.getCloseBatchMessage(), 0); - - closeBatchWorker.call(); + pos++; } - } catch (Exception e) { // Occurs only when server replies with valid status but invalid data - throw new CommunicationException("Error accessing database: " + e.getMessage()); + + SingleServerCloseBatchWorker closeBatchWorker = new SingleServerCloseBatchWorker(db, completeBatch.getCloseBatchMessage(), 0); + + closeBatchWorker.call(); + } digest.update(completeBatch); diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java index d96864a..f7e9baa 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java @@ -8,6 +8,8 @@ import meerkat.util.BulletinBoardUtils; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; /** * Created by Arbel on 13/04/2016. @@ -25,24 +27,75 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize private static final MessageFilterList EMPTY_FILTER = MessageFilterList.getDefaultInstance(); private static final int SLEEP_INTERVAL = 10000; // 10 Seconds + private static final int WAIT_CAP = 300000; // 5 minutes wait before deciding that the sync has failed fatally + private Semaphore semaphore; + + /** + * This class is a callback that deletes a message if it has been successfully posted + * It also calls a stored callback + */ private class MessageDeleteCallback implements FutureCallback { private final long entryNum; + private final FutureCallback callback; - public MessageDeleteCallback(long entryNum) { + public MessageDeleteCallback(long entryNum, FutureCallback callback) { this.entryNum = entryNum; + this.callback = callback; } @Override public void onSuccess(Boolean result) { // Success: delete from database localClient.deleteMessage(entryNum, null); + callback.onSuccess(null); } @Override public void onFailure(Throwable t) { - // Ignore + callback.onFailure(t); + } + + } + + /** + * This class aggregates the results from all of the post operations + * If any post has failed: it changes the sync status to SERVER_ERROR + * It also notifies the main sync loop when all uploads are finished + */ + private class SyncStatusUpdateCallback implements FutureCallback { + + private int count; + private boolean errorEncountered; + + public SyncStatusUpdateCallback(int count) { + this.count = count; + this.errorEncountered = false; + } + + private void handleStatusUpdate() { + count--; + if (count <= 0) { + + if (errorEncountered) + updateSyncStatus(SyncStatus.SERVER_ERROR); + + // Upload is done: wake up the synchronizer loop + semaphore.release(); + + } + } + + @Override + public void onSuccess(Void result) { + handleStatusUpdate(); + } + + @Override + public void onFailure(Throwable t) { + errorEncountered = true; + handleStatusUpdate(); } } @@ -52,6 +105,20 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize @Override public void onSuccess(List result) { + // Notify Message Count callbacks if needed + + if (syncStatus != SyncStatus.SYNCHRONIZED || result.size() > 0) { + + for (FutureCallback callback : messageCountCallbacks){ + callback.onSuccess(result.size()); + } + + } + + // Handle upload and status change + + SyncStatusUpdateCallback syncStatusUpdateCallback = new SyncStatusUpdateCallback(result.size()); + SyncStatus newStatus = SyncStatus.PENDING; if (result.size() == 0) { @@ -79,20 +146,21 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize CompleteBatch completeBatch = localClient.readBatch(batchSpecificationMessage); - remoteClient.postBatch(completeBatch); + remoteClient.postBatch(completeBatch, new MessageDeleteCallback(message.getEntryNum(), syncStatusUpdateCallback)); } else { // This is a regular message: post it - - remoteClient.postMessage(message); + remoteClient.postMessage(message, new MessageDeleteCallback(message.getEntryNum(), syncStatusUpdateCallback)); } localClient.deleteMessage(message.getEntryNum()); } catch (CommunicationException e) { + // This is an error with the local server + // TODO: log updateSyncStatus(SyncStatus.SERVER_ERROR); } @@ -143,6 +211,8 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize messageCountCallbacks = new LinkedList<>(); syncStatusCallbacks = new LinkedList<>(); + semaphore = new Semaphore(0); + } @Override @@ -185,18 +255,32 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize while (syncStatus != SyncStatus.STOPPED) { - try { + do { + + if (syncStatus == SyncStatus.PENDING || syncStatus == SyncStatus.SERVER_ERROR) { - do { localClient.readMessages(EMPTY_FILTER, callback); - } while (syncStatus == SyncStatus.PENDING); - synchronized (this) { - this.wait(SLEEP_INTERVAL); } + try { + + semaphore.tryAcquire(WAIT_CAP, TimeUnit.MILLISECONDS); + //TODO: log hard error. Too much time trying to upload data. + + } catch (InterruptedException ignored) { + // We expect an interruption when the upload will complete + } + + + } while (syncStatus == SyncStatus.PENDING); + + // Database is synced. Wait for new data. + + try { + semaphore.tryAcquire(SLEEP_INTERVAL, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - e.printStackTrace(); + //TODO: log (probably nudged) } } @@ -207,9 +291,7 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize @Override public void nudge() { - synchronized (this) { - this.notify(); - } + semaphore.release(); } @Override diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGetRedundancyWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGetRedundancyWorker.java index 0401a76..23c07af 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGetRedundancyWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerGetRedundancyWorker.java @@ -6,7 +6,6 @@ import meerkat.comm.MessageInputStream; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.rest.Constants; -import javax.ws.rs.ProcessingException; import javax.ws.rs.client.Client; import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; @@ -14,7 +13,6 @@ import javax.ws.rs.core.Response; import java.io.IOException; import java.io.InputStream; -import java.lang.reflect.InvocationTargetException; import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH; import static meerkat.bulletinboard.BulletinBoardConstants.READ_MESSAGES_PATH; diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java new file mode 100644 index 0000000..a3ace28 --- /dev/null +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java @@ -0,0 +1,241 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.protobuf.ByteString; + +import static meerkat.bulletinboard.BulletinBoardSynchronizer.SyncStatus; + +import meerkat.bulletinboard.sqlserver.BulletinBoardSQLServer; +import meerkat.bulletinboard.sqlserver.H2QueryProvider; + +import meerkat.comm.CommunicationException; +import meerkat.crypto.concrete.ECDSASignature; +import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.util.BulletinBoardMessageComparator; +import meerkat.util.BulletinBoardMessageGenerator; +import org.junit.*; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; + +import java.io.IOException; +import java.io.InputStream; +import java.security.*; +import java.security.cert.CertificateException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Semaphore; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; + +/** + * Created by Arbel on 6/1/2016. + */ +public class BulletinBoardSynchronizerTest { + + private static final String REMOTE_SERVER_ADDRESS = "remoteDB"; + private static final String LOCAL_SERVER_ADDRESS = "localDB"; + + private static final int THREAD_NUM = 3; + private static final int SUBSCRIPTION_INTERVAL = 1000; + + private DeletableSubscriptionBulletinBoardClient localClient; + private AsyncBulletinBoardClient remoteClient; + + private BulletinBoardSynchronizer synchronizer; + + private static BulletinBoardMessageGenerator messageGenerator; + private static BulletinBoardMessageComparator messageComparator; + + private static String KEYFILE_EXAMPLE = "/certs/enduser-certs/user1-key-with-password-secret.p12"; + private static String KEYFILE_PASSWORD1 = "secret"; + private static String CERT1_PEM_EXAMPLE = "/certs/enduser-certs/user1.crt"; + + private static GenericBatchDigitalSignature[] signers; + private static ByteString[] signerIDs; + + private Semaphore semaphore; + private List thrown; + + @BeforeClass + public static void build() { + + messageGenerator = new BulletinBoardMessageGenerator(new Random(0)); + messageComparator = new BulletinBoardMessageComparator(); + + signers = new GenericBatchDigitalSignature[1]; + signerIDs = new ByteString[1]; + + signers[0] = new GenericBatchDigitalSignature(new ECDSASignature()); + + InputStream keyStream = BulletinBoardSynchronizerTest.class.getResourceAsStream(KEYFILE_EXAMPLE); + char[] password = KEYFILE_PASSWORD1.toCharArray(); + + try { + + KeyStore.Builder keyStoreBuilder = signers[0].getPKCS12KeyStoreBuilder(keyStream, password); + + signers[0].loadSigningCertificate(keyStoreBuilder); + + signers[0].loadVerificationCertificates(BulletinBoardSynchronizerTest.class.getResourceAsStream(CERT1_PEM_EXAMPLE)); + + } catch (IOException e) { + System.err.println("Failed reading from signature file " + e.getMessage()); + fail("Failed reading from signature file " + e.getMessage()); + } catch (CertificateException e) { + System.err.println("Failed reading certificate " + e.getMessage()); + fail("Failed reading certificate " + e.getMessage()); + } catch (KeyStoreException e) { + System.err.println("Failed reading keystore " + e.getMessage()); + fail("Failed reading keystore " + e.getMessage()); + } catch (NoSuchAlgorithmException e) { + System.err.println("Couldn't find signing algorithm " + e.getMessage()); + fail("Couldn't find signing algorithm " + e.getMessage()); + } catch (UnrecoverableKeyException e) { + System.err.println("Couldn't find signing key " + e.getMessage()); + fail("Couldn't find signing key " + e.getMessage()); + } + + signerIDs[0] = signers[0].getSignerID(); + + } + + @Before + public void init() throws CommunicationException { + + DeletableBulletinBoardServer remoteServer = new BulletinBoardSQLServer(new H2QueryProvider(REMOTE_SERVER_ADDRESS)); + remoteServer.init(REMOTE_SERVER_ADDRESS); + + remoteClient = new LocalBulletinBoardClient( + remoteServer, + THREAD_NUM, + SUBSCRIPTION_INTERVAL); + + DeletableBulletinBoardServer localServer = new BulletinBoardSQLServer(new H2QueryProvider(LOCAL_SERVER_ADDRESS)); + localServer.init(LOCAL_SERVER_ADDRESS); + + localClient = new LocalBulletinBoardClient( + localServer, + THREAD_NUM, + SUBSCRIPTION_INTERVAL); + + synchronizer = new SimpleBulletinBoardSynchronizer(); + synchronizer.init(localClient, remoteClient); + + semaphore = new Semaphore(0); + thrown = new LinkedList<>(); + + } + + private class SyncStatusCallback implements FutureCallback { + + @Override + public void onSuccess(SyncStatus result) { + + if (result == SyncStatus.SYNCHRONIZED){ + semaphore.release(); + } + + } + + @Override + public void onFailure(Throwable t) { + thrown.add(t); + semaphore.release(); + } + } + + private class MessageCountCallback implements FutureCallback { + + private int[] expectedCounts; + private int currentIteration; + + public MessageCountCallback(int[] expectedCounts) { + this.expectedCounts = expectedCounts; + this.currentIteration = 0; + } + + @Override + public void onSuccess(Integer result) { + + if (currentIteration < expectedCounts.length){ + if (result != expectedCounts[currentIteration]){ + onFailure(new AssertionError("Wrong message count. Expected " + expectedCounts[currentIteration] + " but received " + result)); + currentIteration = expectedCounts.length; + return; + } + } + + currentIteration++; + + if (currentIteration == expectedCounts.length) + semaphore.release(); + } + + @Override + public void onFailure(Throwable t) { + thrown.add(t); + semaphore.release(); + } + } + + @Test + public void testSync() throws SignatureException, CommunicationException, InterruptedException { + + final int BATCH_ID = 1; + + BulletinBoardMessage msg = messageGenerator.generateRandomMessage(signers, 10, 10); + + MessageID msgID = localClient.postMessage(msg); + + CompleteBatch completeBatch = messageGenerator.generateRandomBatch(signers[0],BATCH_ID,10,10,10); + + localClient.postBatch(completeBatch); + + synchronizer.subscribeToSyncStatus(new SyncStatusCallback()); + + int[] expectedCounts = {2,0}; + synchronizer.subscribeToRemainingMessagesCount(new MessageCountCallback(expectedCounts)); + + Thread t = new Thread(synchronizer); + t.run(); + + semaphore.acquire(); + + synchronizer.stop(); + t.join(); + + assertThat("Exception thrown by Synchronizer: " + thrown.get(0).getMessage(), thrown.size() == 0); + + List msgList = remoteClient.readMessages(MessageFilterList.newBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.MSG_ID) + .setId(msgID.getID()) + .build()) + .build()); + + assertThat("Wrong number of messages returned.", msgList.size() == 1); + assertThat("Returned message is not equal to original one", messageComparator.compare(msgList.get(0),msg) == 0); + + CompleteBatch returnedBatch = remoteClient.readBatch(BatchSpecificationMessage.newBuilder() + .setSignerId(signerIDs[0]) + .setBatchId(BATCH_ID) + .build()); + + assertThat("Returned batch does not equal original one.", completeBatch.equals(returnedBatch)); + + } + + @After + public void close() { + + synchronizer.stop(); + localClient.close(); + remoteClient.close(); + + } + +} diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java index 6b2ef4f..301fc8e 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java @@ -6,7 +6,6 @@ import java.util.*; import com.google.protobuf.*; import com.google.protobuf.Timestamp; -import com.sun.org.apache.xpath.internal.operations.Bool; import meerkat.bulletinboard.*; import meerkat.bulletinboard.sqlserver.mappers.*; import static meerkat.bulletinboard.BulletinBoardConstants.*; @@ -17,7 +16,6 @@ import meerkat.comm.MessageOutputStream; import meerkat.crypto.concrete.ECDSASignature; import meerkat.crypto.concrete.SHA256Digest; -import meerkat.protobuf.BulletinBoardAPI; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Crypto.Signature; import meerkat.protobuf.Crypto.SignatureVerificationKey; @@ -29,7 +27,6 @@ import javax.sql.DataSource; import meerkat.util.BulletinBoardUtils; import meerkat.util.TimestampComparator; -import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.jdbc.support.GeneratedKeyHolder; diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java index bef6d68..f68548e 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java @@ -247,12 +247,12 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider list.add("CREATE TABLE IF NOT EXISTS TagTable (TagId INT NOT NULL AUTO_INCREMENT PRIMARY KEY, Tag VARCHAR(50) UNIQUE)"); list.add("CREATE TABLE IF NOT EXISTS MsgTagTable (EntryNum INT, TagId INT," - + " FOREIGN KEY (EntryNum) REFERENCES MsgTable(EntryNum)," - + " FOREIGN KEY (TagId) REFERENCES TagTable(TagId)," + + " FOREIGN KEY (EntryNum) REFERENCES MsgTable(EntryNum) ON DELETE CASCADE," + + " FOREIGN KEY (TagId) REFERENCES TagTable(TagId) ON DELETE CASCADE," + " UNIQUE (EntryNum, TagID))"); list.add("CREATE TABLE IF NOT EXISTS SignatureTable (EntryNum INT, SignerId TINYBLOB, Signature TINYBLOB UNIQUE," - + " FOREIGN KEY (EntryNum) REFERENCES MsgTable(EntryNum))"); + + " FOREIGN KEY (EntryNum) REFERENCES MsgTable(EntryNum) ON DELETE CASCADE)"); list.add("CREATE INDEX IF NOT EXISTS SignerIndex ON SignatureTable(SignerId)"); list.add("CREATE UNIQUE INDEX IF NOT EXISTS SignerIndex ON SignatureTable(SignerId, EntryNum)"); @@ -261,7 +261,7 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider + " UNIQUE(SignerId, BatchId, SerialNum))"); list.add("CREATE TABLE IF NOT EXISTS BatchTagTable (SignerId TINYBLOB, BatchId INT, TagId INT," - + " FOREIGN KEY (TagId) REFERENCES TagTable(TagId))"); + + " FOREIGN KEY (TagId) REFERENCES TagTable(TagId) ON DELETE CASCADE)"); list.add("CREATE INDEX IF NOT EXISTS BatchIndex ON BatchTagTable(SignerId, BatchId)"); diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSynchronizer.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSynchronizer.java index 569d4ef..c25d737 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSynchronizer.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardSynchronizer.java @@ -16,8 +16,8 @@ public interface BulletinBoardSynchronizer extends Runnable { public enum SyncStatus{ SYNCHRONIZED, // No more messages to upload - PENDING, // Synchronizer is uploading data - SERVER_ERROR, // Synchronizer encountered an error while uploading + PENDING, // Synchronizer is querying for data to upload and uploading it as needed + SERVER_ERROR, // Synchronizer encountered an error while uploading, but will retry STOPPED // Stopped/Not started by user } diff --git a/meerkat-common/src/main/java/meerkat/util/BulletinBoardMessageGenerator.java b/meerkat-common/src/main/java/meerkat/util/BulletinBoardMessageGenerator.java index dff562e..ea25200 100644 --- a/meerkat-common/src/main/java/meerkat/util/BulletinBoardMessageGenerator.java +++ b/meerkat-common/src/main/java/meerkat/util/BulletinBoardMessageGenerator.java @@ -1,13 +1,14 @@ package meerkat.util; import com.google.protobuf.ByteString; +import meerkat.bulletinboard.BatchDigitalSignature; +import meerkat.bulletinboard.CompleteBatch; import meerkat.crypto.DigitalSignature; import meerkat.protobuf.BulletinBoardAPI.*; import com.google.protobuf.Timestamp; import java.math.BigInteger; import java.security.SignatureException; -import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Random; @@ -28,10 +29,33 @@ public class BulletinBoardMessageGenerator { return (byte) random.nextInt(); } + private byte[] randomBytes(int length) { + + byte[] result = new byte[length]; + + for (int i = 0; i < length; i++) { + result[i] = randomByte(); + } + + return result; + } + private String randomString(){ return new BigInteger(130, random).toString(32); } + private List randomStrings(int length) { + + List result = new LinkedList<>(); + + for (int i = 0; i < length; i++) { + result.add(randomString()); + } + + return result; + + } + /** * Generates a complete instance of a BulletinBoardMessage * @param signers contains the (possibly multiple) credentials required to sign the message @@ -46,23 +70,16 @@ public class BulletinBoardMessageGenerator { // Generate random data. - byte[] data = new byte[dataSize]; - String[] newTags = new String[tagNumber]; - for (int i = 0; i < dataSize; i++) { - data[i] = randomByte(); - } - for (int i = 0; i < tagNumber; i++) { - newTags[i] = randomString(); - } + UnsignedBulletinBoardMessage unsignedMessage = UnsignedBulletinBoardMessage.newBuilder() - .setData(ByteString.copyFrom(data)) + .setData(ByteString.copyFrom(randomBytes(dataSize))) .setTimestamp(timestamp) .addAllTag(tags) - .addAllTag(Arrays.asList(newTags)) + .addAllTag(randomStrings(tagNumber)) .build(); BulletinBoardMessage.Builder messageBuilder = @@ -102,7 +119,6 @@ public class BulletinBoardMessageGenerator { * @param tagNumber is the number of tags to generate * @return a random, signed Bulletin Board Message containing random data, tags and timestamp */ - public BulletinBoardMessage generateRandomMessage(DigitalSignature[] signers, int dataSize, int tagNumber) throws SignatureException { @@ -115,4 +131,97 @@ public class BulletinBoardMessageGenerator { } + /** + * Generates a complete instance of a CompleteBatch + * @param signer contains the credentials required to sign the message + * @param batchId is the (per-signer) batch-ID + * @param timestamp is the time at which the message was generated + * @param dataCount is the number of Batch Data in the batch + * @param dataSize is the number of bytes per Batch Data + * @param tagCount is the number of tags + * @param tags contains a list of tags to be added (in addition to the random ones) + * @return a random, signed CompleteBatch containing random data and tags + * @throws SignatureException if an error occurs while signing the batch + */ + public CompleteBatch generateRandomBatch(BatchDigitalSignature signer, int batchId, Timestamp timestamp, int dataCount, int dataSize, int tagCount, List tags) throws SignatureException { + + CompleteBatch result = new CompleteBatch(BeginBatchMessage.newBuilder() + .setSignerId(signer.getSignerID()) + .setBatchId(batchId) + .addAllTag(tags) + .addAllTag(randomStrings(tagCount)) + .build()); + + List batchDataList = new LinkedList<>(); + + for (int i = 0 ; i < dataCount ; i++) { + batchDataList.add(BatchData.newBuilder() + .setData(ByteString.copyFrom(randomBytes(dataSize))) + .build()); + } + + result.appendBatchData(batchDataList); + + result.setTimestamp(timestamp); + + signer.updateContent(result); + + result.setSignature(signer.sign()); + + return result; + + } + + /** + * Generates a complete instance of a CompleteBatch + * @param signer contains the credentials required to sign the message + * @param batchId is the (per-signer) batch-ID + * @param timestamp is the time at which the message was generated + * @param dataCount is the number of Batch Data in the batch + * @param dataSize is the number of bytes per Batch Data + * @param tagCount is the number of tags + * @return a random, signed CompleteBatch containing random data and tags + * @throws SignatureException if an error occurs while signing the batch + */ + public CompleteBatch generateRandomBatch(BatchDigitalSignature signer, int batchId, Timestamp timestamp, int dataCount, int dataSize, int tagCount) throws SignatureException { + return generateRandomBatch(signer, batchId, timestamp, dataCount, dataSize, tagCount, new LinkedList()); + } + + + /** + * Generates a complete instance of a CompleteBatch + * @param signer contains the credentials required to sign the message + * @param batchId is the (per-signer) batch-ID + * @param dataCount is the number of Batch Data in the batch + * @param dataSize is the number of bytes per Batch Data + * @param tagCount is the number of tags + * @param tags contains a list of tags to be added (in addition to the random ones) + * @return a random, signed CompleteBatch containing random data, tags and timestamp + * @throws SignatureException if an error occurs while signing the batch + */ + public CompleteBatch generateRandomBatch(BatchDigitalSignature signer, int batchId, int dataCount, int dataSize, int tagCount, List tags) throws SignatureException { + + Timestamp timestamp = Timestamp.newBuilder() + .setSeconds(random.nextLong()) + .setNanos(random.nextInt()) + .build(); + + return generateRandomBatch(signer, batchId, timestamp, dataCount, dataSize, tagCount, tags); + + } + + /** + * Generates a complete instance of a CompleteBatch + * @param signer contains the credentials required to sign the message + * @param batchId is the (per-signer) batch-ID + * @param dataCount is the number of Batch Data in the batch + * @param dataSize is the number of bytes per Batch Data + * @param tagCount is the number of tags + * @return a random, signed CompleteBatch containing random data, tags and timestamp + * @throws SignatureException if an error occurs while signing the batch + */ + public CompleteBatch generateRandomBatch(BatchDigitalSignature signer, int batchId, int dataCount, int dataSize, int tagCount) throws SignatureException { + return generateRandomBatch(signer, batchId, dataCount, dataSize, tagCount, new LinkedList()); + } + }