diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java index ea0e93d..24cd6bf 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java @@ -381,11 +381,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i @Override public void onSuccess(Boolean msg) { closeBatch( - CloseBatchMessage.newBuilder() - .setBatchId(completeBatch.getBeginBatchMessage().getBatchId()) - .setSig(completeBatch.getSignature()) - .setBatchLength(completeBatch.getBatchDataList().size()) - .build(), + completeBatch.getCloseBatchMessage(), callback ); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadBatchWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadBatchWorker.java index 11fc777..57b336e 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadBatchWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadBatchWorker.java @@ -3,6 +3,7 @@ package meerkat.bulletinboard.workers.singleserver; import meerkat.bulletinboard.CompleteBatch; import meerkat.bulletinboard.SingleServerWorker; import meerkat.comm.CommunicationException; +import meerkat.comm.MessageInputStream; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.rest.Constants; @@ -12,6 +13,9 @@ import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.GenericType; import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; import java.util.List; import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH; @@ -40,29 +44,33 @@ public class SingleServerReadBatchWorker extends SingleServerWorker inputStream = null; try { - // If a BatchDataList is returned: the read was successful - return response.readEntity(BatchDataList.class).getDataList(); + inputStream = MessageInputStream.MessageInputStreamFactory.createMessageInputStream(in, BatchData.class); - } catch (ProcessingException | IllegalStateException e) { + return inputStream.asList(); + + } catch (IOException | InvocationTargetException e) { // Read failed - throw new CommunicationException("Could not contact the server"); + throw new CommunicationException("Could not contact the server or server returned illegal result"); - } - finally { - response.close(); + } catch (NoSuchMethodException | IllegalAccessException e) { + + throw new CommunicationException("MessageInputStream error"); + + } finally { + try { + inputStream.close(); + } catch (IOException ignored) {} } } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadMessagesWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadMessagesWorker.java index 6c09bcc..d8525ab 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadMessagesWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadMessagesWorker.java @@ -2,6 +2,8 @@ package meerkat.bulletinboard.workers.singleserver; import meerkat.bulletinboard.SingleServerWorker; import meerkat.comm.CommunicationException; +import meerkat.comm.MessageInputStream; +import meerkat.protobuf.BulletinBoardAPI; import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessageList; import meerkat.protobuf.BulletinBoardAPI.MessageFilterList; import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage; @@ -13,6 +15,9 @@ import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; import java.util.List; import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH; @@ -38,30 +43,33 @@ public class SingleServerReadMessagesWorker extends SingleServerWorker inputStream = null; try { - // If a BulletinBoardMessageList is returned: the read was successful - return response.readEntity(BulletinBoardMessageList.class).getMessageList(); + inputStream = MessageInputStream.MessageInputStreamFactory.createMessageInputStream(in, BulletinBoardMessage.class); - } catch (ProcessingException | IllegalStateException e) { + return inputStream.asList(); + + } catch (IOException | InvocationTargetException e) { // Read failed - throw new CommunicationException("Could not contact the server"); + throw new CommunicationException("Could not contact the server or server returned illegal result"); - } - finally { - response.close(); - } + } catch (NoSuchMethodException | IllegalAccessException e) { + throw new CommunicationException("MessageInputStream error"); + + } finally { + try { + inputStream.close(); + } catch (IOException ignored) {} + } } diff --git a/bulletin-board-client/src/test/java/ThreadedBulletinBoardClientIntegrationTest.java b/bulletin-board-client/src/test/java/ThreadedBulletinBoardClientIntegrationTest.java index dfccebc..59aa615 100644 --- a/bulletin-board-client/src/test/java/ThreadedBulletinBoardClientIntegrationTest.java +++ b/bulletin-board-client/src/test/java/ThreadedBulletinBoardClientIntegrationTest.java @@ -1,5 +1,6 @@ import com.google.common.util.concurrent.FutureCallback; -import com.google.protobuf.ByteString; +import com.google.protobuf.*; +import com.google.protobuf.Timestamp; import meerkat.bulletinboard.AsyncBulletinBoardClient; import meerkat.bulletinboard.CompleteBatch; import meerkat.bulletinboard.GenericBatchDigitalSignature; @@ -284,6 +285,11 @@ public class ThreadedBulletinBoardClientIntegrationTest { } + completeBatch.setTimestamp(Timestamp.newBuilder() + .setSeconds(Math.abs(90)) + .setNanos(50) + .build()); + signers[signer].updateContent(completeBatch); completeBatch.setSignature(signers[signer].sign()); @@ -357,6 +363,10 @@ public class ThreadedBulletinBoardClientIntegrationTest { .addTag("Signature") .addTag("Trustee") .setData(ByteString.copyFrom(b1)) + .setTimestamp(Timestamp.newBuilder() + .setSeconds(20) + .setNanos(30) + .build()) .build()) .addSig(Crypto.Signature.newBuilder() .setType(Crypto.SignatureType.DSA) @@ -440,6 +450,10 @@ public class ThreadedBulletinBoardClientIntegrationTest { CloseBatchMessage closeBatchMessage = CloseBatchMessage.newBuilder() .setBatchId(BATCH_ID) .setBatchLength(BATCH_LENGTH) + .setTimestamp(Timestamp.newBuilder() + .setSeconds(50) + .setNanos(80) + .build()) .setSig(completeBatch.getSignature()) .build(); @@ -525,6 +539,10 @@ public class ThreadedBulletinBoardClientIntegrationTest { .setBatchId(NON_EXISTENT_BATCH_ID) .setBatchLength(1) .setSig(Crypto.Signature.getDefaultInstance()) + .setTimestamp(Timestamp.newBuilder() + .setSeconds(9) + .setNanos(12) + .build()) .build(); // Try to close the (unopened) batch; diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java index b4e2949..c7bcf57 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java @@ -446,9 +446,10 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ sql = sqlQueryProvider.getSQLString(QueryType.FIND_MSG_ID); Map namedParameters = new HashMap(); + namedParameters.put(QueryType.FIND_MSG_ID.getParamName(0),msgID); - List entryNums = jdbcTemplate.query(sql, new MapSqlParameterSource(namedParameters), new LongMapper()); + List entryNums = jdbcTemplate.query(sql, namedParameters, new LongMapper()); if (entryNums.size() > 0){ @@ -462,7 +463,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ namedParameters.put(QueryType.INSERT_MSG.getParamName(2), msg.getMsg().toByteArray()); KeyHolder keyHolder = new GeneratedKeyHolder(); - jdbcTemplate.update(sql,new MapSqlParameterSource(namedParameters),keyHolder); + jdbcTemplate.update(sql, new MapSqlParameterSource(namedParameters), keyHolder); entryNum = keyHolder.getKey().longValue(); @@ -785,6 +786,9 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ .build() ); + // Add timestamp to CompleteBatch + completeBatch.setTimestamp(message.getTimestamp()); + // Add actual batch data to CompleteBatch sql = sqlQueryProvider.getSQLString(QueryType.GET_BATCH_MESSAGE_DATA); @@ -903,20 +907,20 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ long lastEntryNum = getLastMessageEntry(); - long checksum = 0; - Iterator queryIterator = syncQuery.getQueryList().iterator(); SingleSyncQuery currentQuery = queryIterator.next(); List messageStubs = readMessageStubs(syncQuery.getFilterList()); + Checksum checksum = new SimpleChecksum(); + for (BulletinBoardMessage message : messageStubs){ // Check for end of current query if (timestampComparator.compare(message.getMsg().getTimestamp(), currentQuery.getTimeOfSync()) > 0){ - if (checksum == currentQuery.getChecksum()){ + if (checksum.getChecksum() == currentQuery.getChecksum()){ lastTimeOfSync = currentQuery.getTimeOfSync(); } else { break; @@ -932,13 +936,13 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ // Advance checksum - ByteString messageID = message.getMsg().getData(); + ByteString messageID = message.getMsg().getData(); // The data field contains the message ID - checksum &= messageID.byteAt(0) & messageID.byteAt(1) & messageID.byteAt(2) & messageID.byteAt(3); + checksum.update(messageID); } - if (checksum == currentQuery.getChecksum()){ + if (checksum.getChecksum() == currentQuery.getChecksum()){ lastTimeOfSync = currentQuery.getTimeOfSync(); } diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/SQLiteQueryProvider.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/SQLiteQueryProvider.java index b581b87..b64412d 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/SQLiteQueryProvider.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/SQLiteQueryProvider.java @@ -4,6 +4,7 @@ import meerkat.protobuf.BulletinBoardAPI.*; import org.sqlite.SQLiteDataSource; import javax.sql.DataSource; +import java.text.MessageFormat; import java.util.LinkedList; import java.util.List; @@ -25,19 +26,97 @@ public class SQLiteQueryProvider implements BulletinBoardSQLServer.SQLQueryProvi switch(queryType) { case ADD_SIGNATURE: return "INSERT OR IGNORE INTO SignatureTable (EntryNum, SignerId, Signature) VALUES (:EntryNum,:SignerId,:Signature)"; + case CONNECT_TAG: return "INSERT OR IGNORE INTO MsgTagTable (TagId, EntryNum)" + " SELECT TagTable.TagId, :EntryNum AS EntryNum FROM TagTable WHERE Tag = :Tag"; + case FIND_MSG_ID: return "SELECT EntryNum From MsgTable WHERE MsgId = :MsgId"; + + case FIND_TAG_ID: + return MessageFormat.format( + "SELECT TagId FROM TagTable WHERE Tag = :{0}", + QueryType.FIND_TAG_ID.getParamName(0)); + case GET_MESSAGES: return "SELECT MsgTable.EntryNum, MsgTable.Msg FROM MsgTable"; + + case COUNT_MESSAGES: + return "SELECT COUNT(MsgTable.EntryNum) FROM MsgTable"; + + case GET_MESSAGE_STUBS: + return "SELECT MsgTable.EntryNum, MsgTable.MsgId, MsgTable.ExactTime FROM MsgTable"; + case GET_SIGNATURES: return "SELECT Signature FROM SignatureTable WHERE EntryNum = :EntryNum"; + case INSERT_MSG: return "INSERT INTO MsgTable (MsgId, Msg) VALUES(:MsgId,:Msg)"; + case INSERT_NEW_TAG: return "INSERT OR IGNORE INTO TagTable(Tag) VALUES (:Tag)"; + + case GET_LAST_MESSAGE_ENTRY: + return "SELECT MAX(MsgTable.EntryNum) FROM MsgTable"; + + case GET_BATCH_MESSAGE_ENTRY: + return MessageFormat.format( + "SELECT MsgTable.EntryNum, MsgTable.Msg FROM MsgTable" + + " INNER JOIN SignatureTable ON MsgTable.EntryNum = SignatureTable.EntryNum" + + " INNER JOIN MsgTagTable ON MsgTable.EntryNum = MsgTagTable.EntryNum" + + " INNER JOIN TagTable ON MsgTagTable.TagId = TagTable.TagId" + + " WHERE SignatureTable.SignerId = :{0}" + + " AND TagTable.Tag = :{1}", + QueryType.GET_BATCH_MESSAGE_ENTRY.getParamName(0), + QueryType.GET_BATCH_MESSAGE_ENTRY.getParamName(1)); + + case GET_BATCH_MESSAGE_DATA: + return MessageFormat.format( + "SELECT Data FROM BatchTable" + + " WHERE SignerId = :{0} AND BatchId = :{1} AND SerialNum >= :{2}" + + " ORDER BY SerialNum ASC", + QueryType.GET_BATCH_MESSAGE_DATA.getParamName(0), + QueryType.GET_BATCH_MESSAGE_DATA.getParamName(1), + QueryType.GET_BATCH_MESSAGE_DATA.getParamName(2)); + + case INSERT_BATCH_DATA: + return MessageFormat.format( + "INSERT INTO BatchTable (SignerId, BatchId, SerialNum, Data)" + + " VALUES (:{0}, :{1}, :{2}, :{3})", + QueryType.INSERT_BATCH_DATA.getParamName(0), + QueryType.INSERT_BATCH_DATA.getParamName(1), + QueryType.INSERT_BATCH_DATA.getParamName(2), + QueryType.INSERT_BATCH_DATA.getParamName(3)); + + case CHECK_BATCH_LENGTH: + return MessageFormat.format( + "SELECT COUNT(Data) AS BatchLength FROM BatchTable" + + " WHERE SignerId = :{0} AND BatchId = :{1}", + QueryType.CHECK_BATCH_LENGTH.getParamName(0), + QueryType.CHECK_BATCH_LENGTH.getParamName(1)); + + case CONNECT_BATCH_TAG: + return MessageFormat.format( + "INSERT INTO BatchTagTable (SignerId, BatchId, TagId) SELECT :{0}, :{1}, TagId FROM TagTable" + + " WHERE Tag = :{2}", + QueryType.CONNECT_BATCH_TAG.getParamName(0), + QueryType.CONNECT_BATCH_TAG.getParamName(1), + QueryType.CONNECT_BATCH_TAG.getParamName(2)); + + case GET_BATCH_TAGS: + return MessageFormat.format( + "SELECT Tag FROM TagTable INNER JOIN BatchTagTable ON TagTable.TagId = BatchTagTable.TagId" + + " WHERE SignerId = :{0} AND BatchId = :{1} ORDER BY Tag ASC", + QueryType.GET_BATCH_TAGS.getParamName(0), + QueryType.GET_BATCH_TAGS.getParamName(1)); + + case REMOVE_BATCH_TAGS: + return MessageFormat.format( + "DELETE FROM BatchTagTable WHERE SignerId = :{0} AND BatchId = :{1}", + QueryType.REMOVE_BATCH_TAGS.getParamName(0), + QueryType.REMOVE_BATCH_TAGS.getParamName(1)); + default: throw new IllegalArgumentException("Cannot serve a query of type " + queryType); } @@ -52,21 +131,34 @@ public class SQLiteQueryProvider implements BulletinBoardSQLServer.SQLQueryProvi switch(filterType) { case EXACT_ENTRY: return "MsgTable.EntryNum = :EntryNum" + serialString; + case MAX_ENTRY: return "MsgTable.EntryNum <= :EntryNum" + serialString; + case MIN_ENTRY: return "MsgTable.EntryNum <= :EntryNum" + serialString; + case MAX_MESSAGES: return "LIMIT = :Limit" + serialString; + case MSG_ID: return "MsgTable.MsgId = :MsgId" + serialString; + case SIGNER_ID: return "EXISTS (SELECT 1 FROM SignatureTable" + " WHERE SignatureTable.SignerId = :SignerId" + serialString + " AND SignatureTable.EntryNum = MsgTable.EntryNum)"; + case TAG: return "EXISTS (SELECT 1 FROM TagTable" + " INNER JOIN MsgTagTable ON TagTable.TagId = MsgTagTable.TagId" + " WHERE TagTable.Tag = :Tag" + serialString + " AND MsgTagTable.EntryNum = MsgTable.EntryNum)"; + + case BEFORE_TIME: + return "MsgTable.ExactTime <= :TimeStamp"; + + case AFTER_TIME: + return "MsgTable.ExactTime >= :TimeStamp"; + default: throw new IllegalArgumentException("Cannot serve a filter of type " + filterType); } diff --git a/bulletin-board-server/src/test/java/meerkat/bulletinboard/GenericBulletinBoardServerTest.java b/bulletin-board-server/src/test/java/meerkat/bulletinboard/GenericBulletinBoardServerTest.java index e33c739..affb1a3 100644 --- a/bulletin-board-server/src/test/java/meerkat/bulletinboard/GenericBulletinBoardServerTest.java +++ b/bulletin-board-server/src/test/java/meerkat/bulletinboard/GenericBulletinBoardServerTest.java @@ -24,9 +24,12 @@ import meerkat.comm.CommunicationException; import meerkat.comm.MessageInputStream; import meerkat.comm.MessageOutputStream; import meerkat.comm.MessageInputStream.MessageInputStreamFactory; +import meerkat.crypto.Digest; import meerkat.crypto.concrete.ECDSASignature; +import meerkat.crypto.concrete.SHA256Digest; import meerkat.protobuf.BulletinBoardAPI.*; -import meerkat.util.BulletinBoardUtils; +import meerkat.util.BulletinBoardMessageGenerator; +import org.h2.util.DateTimeUtils; import static org.junit.Assert.*; import static org.hamcrest.CoreMatchers.*; @@ -59,6 +62,10 @@ public class GenericBulletinBoardServerTest { private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); // Used to time the tests + private BulletinBoardMessageGenerator bulletinBoardMessageGenerator; + + private Digest digest; + /** * @param bulletinBoardServer is an initialized server. * @throws InstantiationException @@ -121,8 +128,12 @@ public class GenericBulletinBoardServerTest { System.err.println("Couldn't find signing key " + e.getMessage()); fail("Couldn't find signing key " + e.getMessage()); } - - random = new Random(0); // We use insecure randomness in tests for repeatability + + // We use insecure randomness in tests for repeatability + random = new Random(0); + bulletinBoardMessageGenerator = new BulletinBoardMessageGenerator(random); + + digest = new SHA256Digest(); long end = threadBean.getCurrentThreadCpuTime(); System.err.println("Finished initializing GenericBulletinBoardServerTest"); @@ -182,7 +193,10 @@ public class GenericBulletinBoardServerTest { for (i = 1; i <= MESSAGE_NUM; i++) { unsignedMsgBuilder = UnsignedBulletinBoardMessage.newBuilder() .setData(ByteString.copyFrom(data[i - 1])) - .setTimestamp(BulletinBoardUtils.toTimestampProto()); + .setTimestamp(Timestamp.newBuilder() + .setSeconds(i) + .setNanos(i) + .build()); // Add tags based on bit-representation of message number. @@ -619,6 +633,74 @@ public class GenericBulletinBoardServerTest { } } + + public void testSyncQuery() + throws SignatureException, CommunicationException, IOException,NoSuchMethodException, IllegalAccessException, InvocationTargetException { + + Checksum checksum = new SimpleChecksum(); + + Timestamp timestamp = Timestamp.newBuilder() + .setSeconds(1) + .setNanos(0) + .build(); + + BulletinBoardMessage newMessage = bulletinBoardMessageGenerator.generateRandomMessage(signers, timestamp, 10, 10); + + BoolMsg result = bulletinBoardServer.postMessage(newMessage); + assertThat("Failed to post message to BB Server", result.getValue(), is(true)); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + digest.update(newMessage.getMsg()); + + ByteString messageID = ByteString.copyFrom(digest.digest()); + + MessageFilterList filterList = MessageFilterList.newBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.MSG_ID) + .setId(messageID) + .build()) + .build(); + + bulletinBoardServer.readMessages(filterList, new MessageOutputStream(outputStream)); + + MessageInputStream inputStream = + MessageInputStreamFactory.createMessageInputStream(new ByteArrayInputStream( + outputStream.toByteArray()), + BulletinBoardMessage.class); + + long lastEntry = inputStream.asList().get(0).getEntryNum(); + + SyncQuery syncQuery = SyncQuery.newBuilder() + .setFilterList(MessageFilterList.getDefaultInstance()) + .addQuery(SingleSyncQuery.newBuilder() + .setChecksum(2) + .setTimeOfSync(Timestamp.newBuilder() + .setSeconds(2) + .setNanos(0) + .build()) + .build()) + .build(); + + SyncQueryResponse queryResponse = bulletinBoardServer.querySync(syncQuery); + + assertThat("Sync query replies with positive sync when no sync was expected", queryResponse.getLastEntryNum(), is(equalTo(-1l))); + + syncQuery = SyncQuery.newBuilder() + .setFilterList(MessageFilterList.getDefaultInstance()) + .addQuery(SingleSyncQuery.newBuilder() + .setChecksum(checksum.getChecksum(messageID)) + .setTimeOfSync(timestamp) + .build()) + .build(); + + queryResponse = bulletinBoardServer.querySync(syncQuery); + + assertThat("Sync query reply contained wrong last entry number", lastEntry, is(equalTo(queryResponse.getLastEntryNum()))); + + assertThat("Sync query reply contained wrong timestamp", timestamp, is(equalTo(queryResponse.getLastTimeOfSync()))); + + } public void close(){ signers[0].clearSigningKey(); diff --git a/bulletin-board-server/src/test/java/meerkat/bulletinboard/MySQLBulletinBoardServerTest.java b/bulletin-board-server/src/test/java/meerkat/bulletinboard/MySQLBulletinBoardServerTest.java index 42c11f7..273a53f 100644 --- a/bulletin-board-server/src/test/java/meerkat/bulletinboard/MySQLBulletinBoardServerTest.java +++ b/bulletin-board-server/src/test/java/meerkat/bulletinboard/MySQLBulletinBoardServerTest.java @@ -8,8 +8,11 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.ThreadMXBean; +import java.lang.reflect.InvocationTargetException; +import java.security.SignatureException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; @@ -144,6 +147,16 @@ public class MySQLBulletinBoardServerTest { } + @Test + public void testSyncQuery() { + try { + serverTest.testSyncQuery(); + } catch (Exception e) { + System.err.println(e.getMessage()); + fail(e.getMessage()); + } + } + @After public void close() { System.err.println("Starting to close MySQLBulletinBoardServerTest"); diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/Checksum.java b/meerkat-common/src/main/java/meerkat/bulletinboard/Checksum.java new file mode 100644 index 0000000..b8ddb0b --- /dev/null +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/Checksum.java @@ -0,0 +1,122 @@ +package meerkat.bulletinboard; + +import com.google.protobuf.ByteString; +import meerkat.crypto.Digest; +import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage; +import meerkat.protobuf.BulletinBoardAPI.MessageID; + +import java.util.Collection; + +/** + * Created by Arbel Deutsch Peled on 01-Mar-16. + * This interface is used to create checksums of Bulletin Board messages IDs + * This is useful in comparing database states + */ +public interface Checksum { + + /** + * Sets the Digest method which is used in creating message IDs from the messages themselves + * This method must be called with an initialized Digest before calling any methods that receive a parameter of type BulletinBoardMessage + * @param digest is the Digest that will be used to create message IDs from Bulletin Board Messages + */ + public void setDigest(Digest digest); + + /** + * Used to reset the current checksum state + */ + public void reset(); + + /** + * Update the current checksum with the given ID + * @param messageID is the message ID to be added + */ + public void update(MessageID messageID); + + /** + * Update the current checksum with the given collection of IDs + * @param messageIDs contains the message IDs + */ + public void update(Collection messageIDs); + + /** + * Update the current checksum with the given ID + * @param messageID is the message ID to be added + */ + public void update(ByteString messageID); + + /** + * Update the current checksum with the given ID + * @param messageID is the message ID to be added + */ + public void update(byte[] messageID); + + /** + * Update the current checksum with the message ID of the given message + * @param bulletinBoardMessage is the message whose ID should be added to the checksum + * @throws IllegalStateException if a Digest has not been set before calling this method + */ + public void digestAndUpdate(BulletinBoardMessage bulletinBoardMessage) throws IllegalStateException; + + + /** + * Update the current checksum with the message IDs of the given messages + * @param bulletinBoardMessages contains the messages whose IDs should be added to the checksum + * @throws IllegalStateException if a Digest has not been set before calling this method + */ + public void digestAndUpdate(Collection bulletinBoardMessages) throws IllegalStateException; + + /** + * Returns the current checksum without changing the checksum state + * @return the current checksum + */ + public long getChecksum(); + + /** + * Updates the current checksum with the given ID and returns the resulting checksum + * The checksum is not reset afterwards + * @param messageID is the message ID to be added + * @return the updated checksum + */ + public long getChecksum(MessageID messageID); + + /** + * Updates the current checksum with the given ID and returns the resulting checksum + * The checksum is not reset afterwards + * @param messageID is the message ID to be added + * @return the updated checksum + */ + public long getChecksum(ByteString messageID); + + /** + * Updates the current checksum with the given ID and returns the resulting checksum + * The checksum is not reset afterwards + * @param messageID is the message ID to be added + * @return the updated checksum + */ + public long getChecksum(byte[] messageID); + + /** + * Updates the current checksum with the given IDs and returns the resulting checksum + * The checksum is not reset afterwards + * @param messageIDs contains the message IDs to be added + * @return the updated checksum + */ + public long getChecksum(Collection messageIDs); + + /** + * Updates the current checksum with the message ID of the given message + * The checksum is not reset afterwards + * @param bulletinBoardMessage is the message whose ID should be added to the checksum + * @return the updated checksum + */ + public long digestAndGetChecksum(BulletinBoardMessage bulletinBoardMessage) throws IllegalStateException; + + /** + * Updates the current checksum with the message IDs of the given messages + * The checksum is not reset afterwards + * @param bulletinBoardMessages contains the messages whose IDs should be added to the checksum + * @return the updated checksum + */ + public long digestAndGetChecksum(Collection bulletinBoardMessages) throws IllegalStateException; + +} diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigest.java b/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigest.java index 4f25f59..852bb24 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigest.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigest.java @@ -29,6 +29,8 @@ public class GenericBatchDigest implements BatchDigest{ update(batchData); } + update(completeBatch.getTimestamp()); + } @Override diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigitalSignature.java b/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigitalSignature.java index 7174b42..7327b8e 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigitalSignature.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigitalSignature.java @@ -32,10 +32,13 @@ public class GenericBatchDigitalSignature implements BatchDigitalSignature{ public void updateContent(CompleteBatch completeBatch) throws SignatureException { digitalSignature.updateContent(completeBatch.getBeginBatchMessage()); + for (BatchData batchData : completeBatch.getBatchDataList()) { digitalSignature.updateContent(batchData); } + digitalSignature.updateContent(completeBatch.getTimestamp()); + } @Override diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/SimpleChecksum.java b/meerkat-common/src/main/java/meerkat/bulletinboard/SimpleChecksum.java new file mode 100644 index 0000000..90d7ae5 --- /dev/null +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/SimpleChecksum.java @@ -0,0 +1,131 @@ +package meerkat.bulletinboard; + +import com.google.protobuf.ByteString; +import meerkat.crypto.Digest; +import meerkat.protobuf.BulletinBoardAPI.MessageID; +import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage; + +import java.util.Collection; + +/** + * Created by Arbel Deutsch Peled on 01-Mar-16. + * Implementation of Checksum via bitwise XOR of the bytes of message IDs + */ +public class SimpleChecksum implements Checksum{ + + private Digest digest; + private long checksum; + + public SimpleChecksum() { + digest = null; + reset(); + } + + @Override + public void setDigest(Digest digest) { + this.digest = digest; + } + + @Override + public void reset() { + checksum = 0; + } + + @Override + public void update(MessageID messageID) { + ByteString messageIDByteString = messageID.getID(); + update(messageIDByteString); + } + + @Override + public void update(Collection messageIDs) { + + for (MessageID messageID : messageIDs){ + update(messageID); + } + + } + + @Override + public void update(ByteString messageID) { + for (int i = 0 ; i < messageID.size() ; i++){ + checksum &= messageID.byteAt(i); + } + } + + @Override + public void update(byte[] messageID) { + for (int i = 0 ; i < messageID.length ; i++){ + checksum &= messageID[i]; + } + } + + private void checkDigest() throws IllegalStateException { + + if (digest == null){ + throw new IllegalStateException("Digest method not set. Use setDigest method before calling digestAndUpdate."); + } + + } + + @Override + public void digestAndUpdate(BulletinBoardMessage bulletinBoardMessage) throws IllegalStateException { + + checkDigest(); + + digest.reset(); + digest.update(bulletinBoardMessage); + update(digest.digest()); + + } + + @Override + public void digestAndUpdate(Collection bulletinBoardMessages) throws IllegalStateException { + + for (BulletinBoardMessage bulletinBoardMessage : bulletinBoardMessages){ + digestAndUpdate(bulletinBoardMessage); + } + + } + + @Override + public long getChecksum() { + return checksum; + } + + @Override + public long getChecksum(MessageID messageID) { + update(messageID); + return getChecksum(); + } + + @Override + public long getChecksum(ByteString messageID) { + update(messageID); + return getChecksum(); + } + + @Override + public long getChecksum(byte[] messageID) { + update(messageID); + return getChecksum(); + } + + @Override + public long getChecksum(Collection messageIDs) { + update(messageIDs); + return getChecksum(); + } + + @Override + public long digestAndGetChecksum(BulletinBoardMessage bulletinBoardMessage) throws IllegalStateException { + digestAndUpdate(bulletinBoardMessage); + return getChecksum(); + } + + @Override + public long digestAndGetChecksum(Collection bulletinBoardMessages) throws IllegalStateException { + digestAndUpdate(bulletinBoardMessages); + return getChecksum(); + } +} diff --git a/meerkat-common/src/main/java/meerkat/comm/MessageInputStream.java b/meerkat-common/src/main/java/meerkat/comm/MessageInputStream.java index 33deb3f..b1e5255 100644 --- a/meerkat-common/src/main/java/meerkat/comm/MessageInputStream.java +++ b/meerkat-common/src/main/java/meerkat/comm/MessageInputStream.java @@ -5,6 +5,7 @@ import com.google.protobuf.Message; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.InvocationTargetException; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -12,7 +13,7 @@ import java.util.List; * Created by Arbel Deutsch Peled on 21-Feb-16. * A input stream of Protobuf messages */ -public class MessageInputStream{ +public class MessageInputStream implements Iterable{ private T.Builder builder; @@ -23,6 +24,33 @@ public class MessageInputStream{ this.builder = (T.Builder) type.getMethod("newBuilder").invoke(type); } + @Override + public Iterator iterator() { + + return new Iterator() { + + @Override + public boolean hasNext() { + try { + return isAvailable(); + } catch (IOException e) { + return false; + } + } + + @Override + public T next() { + try { + return readMessage(); + } catch (IOException e) { + return null; + } + } + + }; + + } + /** * Factory class for actually creating a MessageInputStream */ @@ -61,4 +89,10 @@ public class MessageInputStream{ } + public void close() throws IOException { + + in.close(); + + } + } diff --git a/meerkat-common/src/main/java/meerkat/comm/MessageOutputStream.java b/meerkat-common/src/main/java/meerkat/comm/MessageOutputStream.java index b55bc8e..f72c04c 100644 --- a/meerkat-common/src/main/java/meerkat/comm/MessageOutputStream.java +++ b/meerkat-common/src/main/java/meerkat/comm/MessageOutputStream.java @@ -21,4 +21,8 @@ public class MessageOutputStream { message.writeDelimitedTo(out); } + public void close() throws IOException { + out.close(); + } + } diff --git a/meerkat-common/src/main/java/meerkat/util/BulletinBoardUtils.java b/meerkat-common/src/main/java/meerkat/util/BulletinBoardUtils.java index d8b362c..8793b2d 100644 --- a/meerkat-common/src/main/java/meerkat/util/BulletinBoardUtils.java +++ b/meerkat-common/src/main/java/meerkat/util/BulletinBoardUtils.java @@ -80,7 +80,7 @@ public class BulletinBoardUtils { * This method creates a Timestamp Protobuf from the current system time * @return a Timestamp Protobuf encoding of the current system time */ - public static com.google.protobuf.Timestamp toTimestampProto() { + public static com.google.protobuf.Timestamp getCurrentTimestampProto() { return toTimestampProto(System.currentTimeMillis());