From 1cf14a60a805a17ebcc795d64a990317c4b414a3 Mon Sep 17 00:00:00 2001 From: Arbel Deutsch Peled Date: Tue, 1 Mar 2016 13:56:18 +0200 Subject: [PATCH] Bulletin Board Client support for streaming and Timestamps Created standard Checksum interface and implementation for Sync Query mechanism Added the Timestamp into the Batch Digest and Signature logic --- .../SingleServerBulletinBoardClient.java | 6 +- .../SingleServerReadBatchWorker.java | 30 ++-- .../SingleServerReadMessagesWorker.java | 32 +++-- ...dedBulletinBoardClientIntegrationTest.java | 20 ++- .../sqlserver/BulletinBoardSQLServer.java | 20 +-- .../GenericBulletinBoardServerTest.java | 4 +- .../java/meerkat/bulletinboard/Checksum.java | 122 ++++++++++++++++ .../bulletinboard/GenericBatchDigest.java | 2 + .../GenericBatchDigitalSignature.java | 3 + .../meerkat/bulletinboard/SimpleChecksum.java | 131 ++++++++++++++++++ .../java/meerkat/comm/MessageInputStream.java | 6 + .../meerkat/comm/MessageOutputStream.java | 4 + 12 files changed, 342 insertions(+), 38 deletions(-) create mode 100644 meerkat-common/src/main/java/meerkat/bulletinboard/Checksum.java create mode 100644 meerkat-common/src/main/java/meerkat/bulletinboard/SimpleChecksum.java diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java index ea0e93d..24cd6bf 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java @@ -381,11 +381,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i @Override public void onSuccess(Boolean msg) { closeBatch( - CloseBatchMessage.newBuilder() - .setBatchId(completeBatch.getBeginBatchMessage().getBatchId()) - .setSig(completeBatch.getSignature()) - .setBatchLength(completeBatch.getBatchDataList().size()) - .build(), + completeBatch.getCloseBatchMessage(), callback ); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadBatchWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadBatchWorker.java index 11fc777..57b336e 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadBatchWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadBatchWorker.java @@ -3,6 +3,7 @@ package meerkat.bulletinboard.workers.singleserver; import meerkat.bulletinboard.CompleteBatch; import meerkat.bulletinboard.SingleServerWorker; import meerkat.comm.CommunicationException; +import meerkat.comm.MessageInputStream; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.rest.Constants; @@ -12,6 +13,9 @@ import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.GenericType; import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; import java.util.List; import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH; @@ -40,29 +44,33 @@ public class SingleServerReadBatchWorker extends SingleServerWorker inputStream = null; try { - // If a BatchDataList is returned: the read was successful - return response.readEntity(BatchDataList.class).getDataList(); + inputStream = MessageInputStream.MessageInputStreamFactory.createMessageInputStream(in, BatchData.class); - } catch (ProcessingException | IllegalStateException e) { + return inputStream.asList(); + + } catch (IOException | InvocationTargetException e) { // Read failed - throw new CommunicationException("Could not contact the server"); + throw new CommunicationException("Could not contact the server or server returned illegal result"); - } - finally { - response.close(); + } catch (NoSuchMethodException | IllegalAccessException e) { + + throw new CommunicationException("MessageInputStream error"); + + } finally { + try { + inputStream.close(); + } catch (IOException ignored) {} } } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadMessagesWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadMessagesWorker.java index 6c09bcc..d8525ab 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadMessagesWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/singleserver/SingleServerReadMessagesWorker.java @@ -2,6 +2,8 @@ package meerkat.bulletinboard.workers.singleserver; import meerkat.bulletinboard.SingleServerWorker; import meerkat.comm.CommunicationException; +import meerkat.comm.MessageInputStream; +import meerkat.protobuf.BulletinBoardAPI; import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessageList; import meerkat.protobuf.BulletinBoardAPI.MessageFilterList; import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage; @@ -13,6 +15,9 @@ import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; import java.util.List; import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH; @@ -38,30 +43,33 @@ public class SingleServerReadMessagesWorker extends SingleServerWorker inputStream = null; try { - // If a BulletinBoardMessageList is returned: the read was successful - return response.readEntity(BulletinBoardMessageList.class).getMessageList(); + inputStream = MessageInputStream.MessageInputStreamFactory.createMessageInputStream(in, BulletinBoardMessage.class); - } catch (ProcessingException | IllegalStateException e) { + return inputStream.asList(); + + } catch (IOException | InvocationTargetException e) { // Read failed - throw new CommunicationException("Could not contact the server"); + throw new CommunicationException("Could not contact the server or server returned illegal result"); - } - finally { - response.close(); - } + } catch (NoSuchMethodException | IllegalAccessException e) { + throw new CommunicationException("MessageInputStream error"); + + } finally { + try { + inputStream.close(); + } catch (IOException ignored) {} + } } diff --git a/bulletin-board-client/src/test/java/ThreadedBulletinBoardClientIntegrationTest.java b/bulletin-board-client/src/test/java/ThreadedBulletinBoardClientIntegrationTest.java index dfccebc..59aa615 100644 --- a/bulletin-board-client/src/test/java/ThreadedBulletinBoardClientIntegrationTest.java +++ b/bulletin-board-client/src/test/java/ThreadedBulletinBoardClientIntegrationTest.java @@ -1,5 +1,6 @@ import com.google.common.util.concurrent.FutureCallback; -import com.google.protobuf.ByteString; +import com.google.protobuf.*; +import com.google.protobuf.Timestamp; import meerkat.bulletinboard.AsyncBulletinBoardClient; import meerkat.bulletinboard.CompleteBatch; import meerkat.bulletinboard.GenericBatchDigitalSignature; @@ -284,6 +285,11 @@ public class ThreadedBulletinBoardClientIntegrationTest { } + completeBatch.setTimestamp(Timestamp.newBuilder() + .setSeconds(Math.abs(90)) + .setNanos(50) + .build()); + signers[signer].updateContent(completeBatch); completeBatch.setSignature(signers[signer].sign()); @@ -357,6 +363,10 @@ public class ThreadedBulletinBoardClientIntegrationTest { .addTag("Signature") .addTag("Trustee") .setData(ByteString.copyFrom(b1)) + .setTimestamp(Timestamp.newBuilder() + .setSeconds(20) + .setNanos(30) + .build()) .build()) .addSig(Crypto.Signature.newBuilder() .setType(Crypto.SignatureType.DSA) @@ -440,6 +450,10 @@ public class ThreadedBulletinBoardClientIntegrationTest { CloseBatchMessage closeBatchMessage = CloseBatchMessage.newBuilder() .setBatchId(BATCH_ID) .setBatchLength(BATCH_LENGTH) + .setTimestamp(Timestamp.newBuilder() + .setSeconds(50) + .setNanos(80) + .build()) .setSig(completeBatch.getSignature()) .build(); @@ -525,6 +539,10 @@ public class ThreadedBulletinBoardClientIntegrationTest { .setBatchId(NON_EXISTENT_BATCH_ID) .setBatchLength(1) .setSig(Crypto.Signature.getDefaultInstance()) + .setTimestamp(Timestamp.newBuilder() + .setSeconds(9) + .setNanos(12) + .build()) .build(); // Try to close the (unopened) batch; diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java index b4e2949..c7bcf57 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java @@ -446,9 +446,10 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ sql = sqlQueryProvider.getSQLString(QueryType.FIND_MSG_ID); Map namedParameters = new HashMap(); + namedParameters.put(QueryType.FIND_MSG_ID.getParamName(0),msgID); - List entryNums = jdbcTemplate.query(sql, new MapSqlParameterSource(namedParameters), new LongMapper()); + List entryNums = jdbcTemplate.query(sql, namedParameters, new LongMapper()); if (entryNums.size() > 0){ @@ -462,7 +463,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ namedParameters.put(QueryType.INSERT_MSG.getParamName(2), msg.getMsg().toByteArray()); KeyHolder keyHolder = new GeneratedKeyHolder(); - jdbcTemplate.update(sql,new MapSqlParameterSource(namedParameters),keyHolder); + jdbcTemplate.update(sql, new MapSqlParameterSource(namedParameters), keyHolder); entryNum = keyHolder.getKey().longValue(); @@ -785,6 +786,9 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ .build() ); + // Add timestamp to CompleteBatch + completeBatch.setTimestamp(message.getTimestamp()); + // Add actual batch data to CompleteBatch sql = sqlQueryProvider.getSQLString(QueryType.GET_BATCH_MESSAGE_DATA); @@ -903,20 +907,20 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ long lastEntryNum = getLastMessageEntry(); - long checksum = 0; - Iterator queryIterator = syncQuery.getQueryList().iterator(); SingleSyncQuery currentQuery = queryIterator.next(); List messageStubs = readMessageStubs(syncQuery.getFilterList()); + Checksum checksum = new SimpleChecksum(); + for (BulletinBoardMessage message : messageStubs){ // Check for end of current query if (timestampComparator.compare(message.getMsg().getTimestamp(), currentQuery.getTimeOfSync()) > 0){ - if (checksum == currentQuery.getChecksum()){ + if (checksum.getChecksum() == currentQuery.getChecksum()){ lastTimeOfSync = currentQuery.getTimeOfSync(); } else { break; @@ -932,13 +936,13 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{ // Advance checksum - ByteString messageID = message.getMsg().getData(); + ByteString messageID = message.getMsg().getData(); // The data field contains the message ID - checksum &= messageID.byteAt(0) & messageID.byteAt(1) & messageID.byteAt(2) & messageID.byteAt(3); + checksum.update(messageID); } - if (checksum == currentQuery.getChecksum()){ + if (checksum.getChecksum() == currentQuery.getChecksum()){ lastTimeOfSync = currentQuery.getTimeOfSync(); } diff --git a/bulletin-board-server/src/test/java/meerkat/bulletinboard/GenericBulletinBoardServerTest.java b/bulletin-board-server/src/test/java/meerkat/bulletinboard/GenericBulletinBoardServerTest.java index 1e7921d..affb1a3 100644 --- a/bulletin-board-server/src/test/java/meerkat/bulletinboard/GenericBulletinBoardServerTest.java +++ b/bulletin-board-server/src/test/java/meerkat/bulletinboard/GenericBulletinBoardServerTest.java @@ -637,6 +637,8 @@ public class GenericBulletinBoardServerTest { public void testSyncQuery() throws SignatureException, CommunicationException, IOException,NoSuchMethodException, IllegalAccessException, InvocationTargetException { + Checksum checksum = new SimpleChecksum(); + Timestamp timestamp = Timestamp.newBuilder() .setSeconds(1) .setNanos(0) @@ -687,7 +689,7 @@ public class GenericBulletinBoardServerTest { syncQuery = SyncQuery.newBuilder() .setFilterList(MessageFilterList.getDefaultInstance()) .addQuery(SingleSyncQuery.newBuilder() - .setChecksum(messageID.byteAt(0) & messageID.byteAt(1) & messageID.byteAt(2) & messageID.byteAt(3)) + .setChecksum(checksum.getChecksum(messageID)) .setTimeOfSync(timestamp) .build()) .build(); diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/Checksum.java b/meerkat-common/src/main/java/meerkat/bulletinboard/Checksum.java new file mode 100644 index 0000000..b8ddb0b --- /dev/null +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/Checksum.java @@ -0,0 +1,122 @@ +package meerkat.bulletinboard; + +import com.google.protobuf.ByteString; +import meerkat.crypto.Digest; +import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage; +import meerkat.protobuf.BulletinBoardAPI.MessageID; + +import java.util.Collection; + +/** + * Created by Arbel Deutsch Peled on 01-Mar-16. + * This interface is used to create checksums of Bulletin Board messages IDs + * This is useful in comparing database states + */ +public interface Checksum { + + /** + * Sets the Digest method which is used in creating message IDs from the messages themselves + * This method must be called with an initialized Digest before calling any methods that receive a parameter of type BulletinBoardMessage + * @param digest is the Digest that will be used to create message IDs from Bulletin Board Messages + */ + public void setDigest(Digest digest); + + /** + * Used to reset the current checksum state + */ + public void reset(); + + /** + * Update the current checksum with the given ID + * @param messageID is the message ID to be added + */ + public void update(MessageID messageID); + + /** + * Update the current checksum with the given collection of IDs + * @param messageIDs contains the message IDs + */ + public void update(Collection messageIDs); + + /** + * Update the current checksum with the given ID + * @param messageID is the message ID to be added + */ + public void update(ByteString messageID); + + /** + * Update the current checksum with the given ID + * @param messageID is the message ID to be added + */ + public void update(byte[] messageID); + + /** + * Update the current checksum with the message ID of the given message + * @param bulletinBoardMessage is the message whose ID should be added to the checksum + * @throws IllegalStateException if a Digest has not been set before calling this method + */ + public void digestAndUpdate(BulletinBoardMessage bulletinBoardMessage) throws IllegalStateException; + + + /** + * Update the current checksum with the message IDs of the given messages + * @param bulletinBoardMessages contains the messages whose IDs should be added to the checksum + * @throws IllegalStateException if a Digest has not been set before calling this method + */ + public void digestAndUpdate(Collection bulletinBoardMessages) throws IllegalStateException; + + /** + * Returns the current checksum without changing the checksum state + * @return the current checksum + */ + public long getChecksum(); + + /** + * Updates the current checksum with the given ID and returns the resulting checksum + * The checksum is not reset afterwards + * @param messageID is the message ID to be added + * @return the updated checksum + */ + public long getChecksum(MessageID messageID); + + /** + * Updates the current checksum with the given ID and returns the resulting checksum + * The checksum is not reset afterwards + * @param messageID is the message ID to be added + * @return the updated checksum + */ + public long getChecksum(ByteString messageID); + + /** + * Updates the current checksum with the given ID and returns the resulting checksum + * The checksum is not reset afterwards + * @param messageID is the message ID to be added + * @return the updated checksum + */ + public long getChecksum(byte[] messageID); + + /** + * Updates the current checksum with the given IDs and returns the resulting checksum + * The checksum is not reset afterwards + * @param messageIDs contains the message IDs to be added + * @return the updated checksum + */ + public long getChecksum(Collection messageIDs); + + /** + * Updates the current checksum with the message ID of the given message + * The checksum is not reset afterwards + * @param bulletinBoardMessage is the message whose ID should be added to the checksum + * @return the updated checksum + */ + public long digestAndGetChecksum(BulletinBoardMessage bulletinBoardMessage) throws IllegalStateException; + + /** + * Updates the current checksum with the message IDs of the given messages + * The checksum is not reset afterwards + * @param bulletinBoardMessages contains the messages whose IDs should be added to the checksum + * @return the updated checksum + */ + public long digestAndGetChecksum(Collection bulletinBoardMessages) throws IllegalStateException; + +} diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigest.java b/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigest.java index 4f25f59..852bb24 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigest.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigest.java @@ -29,6 +29,8 @@ public class GenericBatchDigest implements BatchDigest{ update(batchData); } + update(completeBatch.getTimestamp()); + } @Override diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigitalSignature.java b/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigitalSignature.java index 7174b42..7327b8e 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigitalSignature.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/GenericBatchDigitalSignature.java @@ -32,10 +32,13 @@ public class GenericBatchDigitalSignature implements BatchDigitalSignature{ public void updateContent(CompleteBatch completeBatch) throws SignatureException { digitalSignature.updateContent(completeBatch.getBeginBatchMessage()); + for (BatchData batchData : completeBatch.getBatchDataList()) { digitalSignature.updateContent(batchData); } + digitalSignature.updateContent(completeBatch.getTimestamp()); + } @Override diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/SimpleChecksum.java b/meerkat-common/src/main/java/meerkat/bulletinboard/SimpleChecksum.java new file mode 100644 index 0000000..90d7ae5 --- /dev/null +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/SimpleChecksum.java @@ -0,0 +1,131 @@ +package meerkat.bulletinboard; + +import com.google.protobuf.ByteString; +import meerkat.crypto.Digest; +import meerkat.protobuf.BulletinBoardAPI.MessageID; +import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage; + +import java.util.Collection; + +/** + * Created by Arbel Deutsch Peled on 01-Mar-16. + * Implementation of Checksum via bitwise XOR of the bytes of message IDs + */ +public class SimpleChecksum implements Checksum{ + + private Digest digest; + private long checksum; + + public SimpleChecksum() { + digest = null; + reset(); + } + + @Override + public void setDigest(Digest digest) { + this.digest = digest; + } + + @Override + public void reset() { + checksum = 0; + } + + @Override + public void update(MessageID messageID) { + ByteString messageIDByteString = messageID.getID(); + update(messageIDByteString); + } + + @Override + public void update(Collection messageIDs) { + + for (MessageID messageID : messageIDs){ + update(messageID); + } + + } + + @Override + public void update(ByteString messageID) { + for (int i = 0 ; i < messageID.size() ; i++){ + checksum &= messageID.byteAt(i); + } + } + + @Override + public void update(byte[] messageID) { + for (int i = 0 ; i < messageID.length ; i++){ + checksum &= messageID[i]; + } + } + + private void checkDigest() throws IllegalStateException { + + if (digest == null){ + throw new IllegalStateException("Digest method not set. Use setDigest method before calling digestAndUpdate."); + } + + } + + @Override + public void digestAndUpdate(BulletinBoardMessage bulletinBoardMessage) throws IllegalStateException { + + checkDigest(); + + digest.reset(); + digest.update(bulletinBoardMessage); + update(digest.digest()); + + } + + @Override + public void digestAndUpdate(Collection bulletinBoardMessages) throws IllegalStateException { + + for (BulletinBoardMessage bulletinBoardMessage : bulletinBoardMessages){ + digestAndUpdate(bulletinBoardMessage); + } + + } + + @Override + public long getChecksum() { + return checksum; + } + + @Override + public long getChecksum(MessageID messageID) { + update(messageID); + return getChecksum(); + } + + @Override + public long getChecksum(ByteString messageID) { + update(messageID); + return getChecksum(); + } + + @Override + public long getChecksum(byte[] messageID) { + update(messageID); + return getChecksum(); + } + + @Override + public long getChecksum(Collection messageIDs) { + update(messageIDs); + return getChecksum(); + } + + @Override + public long digestAndGetChecksum(BulletinBoardMessage bulletinBoardMessage) throws IllegalStateException { + digestAndUpdate(bulletinBoardMessage); + return getChecksum(); + } + + @Override + public long digestAndGetChecksum(Collection bulletinBoardMessages) throws IllegalStateException { + digestAndUpdate(bulletinBoardMessages); + return getChecksum(); + } +} diff --git a/meerkat-common/src/main/java/meerkat/comm/MessageInputStream.java b/meerkat-common/src/main/java/meerkat/comm/MessageInputStream.java index dc637d9..b1e5255 100644 --- a/meerkat-common/src/main/java/meerkat/comm/MessageInputStream.java +++ b/meerkat-common/src/main/java/meerkat/comm/MessageInputStream.java @@ -89,4 +89,10 @@ public class MessageInputStream implements Iterable{ } + public void close() throws IOException { + + in.close(); + + } + } diff --git a/meerkat-common/src/main/java/meerkat/comm/MessageOutputStream.java b/meerkat-common/src/main/java/meerkat/comm/MessageOutputStream.java index b55bc8e..f72c04c 100644 --- a/meerkat-common/src/main/java/meerkat/comm/MessageOutputStream.java +++ b/meerkat-common/src/main/java/meerkat/comm/MessageOutputStream.java @@ -21,4 +21,8 @@ public class MessageOutputStream { message.writeDelimitedTo(out); } + public void close() throws IOException { + out.close(); + } + }