diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/SQLiteQueryProvider.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/SQLiteQueryProvider.java index b581b87..b64412d 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/SQLiteQueryProvider.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/SQLiteQueryProvider.java @@ -4,6 +4,7 @@ import meerkat.protobuf.BulletinBoardAPI.*; import org.sqlite.SQLiteDataSource; import javax.sql.DataSource; +import java.text.MessageFormat; import java.util.LinkedList; import java.util.List; @@ -25,19 +26,97 @@ public class SQLiteQueryProvider implements BulletinBoardSQLServer.SQLQueryProvi switch(queryType) { case ADD_SIGNATURE: return "INSERT OR IGNORE INTO SignatureTable (EntryNum, SignerId, Signature) VALUES (:EntryNum,:SignerId,:Signature)"; + case CONNECT_TAG: return "INSERT OR IGNORE INTO MsgTagTable (TagId, EntryNum)" + " SELECT TagTable.TagId, :EntryNum AS EntryNum FROM TagTable WHERE Tag = :Tag"; + case FIND_MSG_ID: return "SELECT EntryNum From MsgTable WHERE MsgId = :MsgId"; + + case FIND_TAG_ID: + return MessageFormat.format( + "SELECT TagId FROM TagTable WHERE Tag = :{0}", + QueryType.FIND_TAG_ID.getParamName(0)); + case GET_MESSAGES: return "SELECT MsgTable.EntryNum, MsgTable.Msg FROM MsgTable"; + + case COUNT_MESSAGES: + return "SELECT COUNT(MsgTable.EntryNum) FROM MsgTable"; + + case GET_MESSAGE_STUBS: + return "SELECT MsgTable.EntryNum, MsgTable.MsgId, MsgTable.ExactTime FROM MsgTable"; + case GET_SIGNATURES: return "SELECT Signature FROM SignatureTable WHERE EntryNum = :EntryNum"; + case INSERT_MSG: return "INSERT INTO MsgTable (MsgId, Msg) VALUES(:MsgId,:Msg)"; + case INSERT_NEW_TAG: return "INSERT OR IGNORE INTO TagTable(Tag) VALUES (:Tag)"; + + case GET_LAST_MESSAGE_ENTRY: + return "SELECT MAX(MsgTable.EntryNum) FROM MsgTable"; + + case GET_BATCH_MESSAGE_ENTRY: + return MessageFormat.format( + "SELECT MsgTable.EntryNum, MsgTable.Msg FROM MsgTable" + + " INNER JOIN SignatureTable ON MsgTable.EntryNum = SignatureTable.EntryNum" + + " INNER JOIN MsgTagTable ON MsgTable.EntryNum = MsgTagTable.EntryNum" + + " INNER JOIN TagTable ON MsgTagTable.TagId = TagTable.TagId" + + " WHERE SignatureTable.SignerId = :{0}" + + " AND TagTable.Tag = :{1}", + QueryType.GET_BATCH_MESSAGE_ENTRY.getParamName(0), + QueryType.GET_BATCH_MESSAGE_ENTRY.getParamName(1)); + + case GET_BATCH_MESSAGE_DATA: + return MessageFormat.format( + "SELECT Data FROM BatchTable" + + " WHERE SignerId = :{0} AND BatchId = :{1} AND SerialNum >= :{2}" + + " ORDER BY SerialNum ASC", + QueryType.GET_BATCH_MESSAGE_DATA.getParamName(0), + QueryType.GET_BATCH_MESSAGE_DATA.getParamName(1), + QueryType.GET_BATCH_MESSAGE_DATA.getParamName(2)); + + case INSERT_BATCH_DATA: + return MessageFormat.format( + "INSERT INTO BatchTable (SignerId, BatchId, SerialNum, Data)" + + " VALUES (:{0}, :{1}, :{2}, :{3})", + QueryType.INSERT_BATCH_DATA.getParamName(0), + QueryType.INSERT_BATCH_DATA.getParamName(1), + QueryType.INSERT_BATCH_DATA.getParamName(2), + QueryType.INSERT_BATCH_DATA.getParamName(3)); + + case CHECK_BATCH_LENGTH: + return MessageFormat.format( + "SELECT COUNT(Data) AS BatchLength FROM BatchTable" + + " WHERE SignerId = :{0} AND BatchId = :{1}", + QueryType.CHECK_BATCH_LENGTH.getParamName(0), + QueryType.CHECK_BATCH_LENGTH.getParamName(1)); + + case CONNECT_BATCH_TAG: + return MessageFormat.format( + "INSERT INTO BatchTagTable (SignerId, BatchId, TagId) SELECT :{0}, :{1}, TagId FROM TagTable" + + " WHERE Tag = :{2}", + QueryType.CONNECT_BATCH_TAG.getParamName(0), + QueryType.CONNECT_BATCH_TAG.getParamName(1), + QueryType.CONNECT_BATCH_TAG.getParamName(2)); + + case GET_BATCH_TAGS: + return MessageFormat.format( + "SELECT Tag FROM TagTable INNER JOIN BatchTagTable ON TagTable.TagId = BatchTagTable.TagId" + + " WHERE SignerId = :{0} AND BatchId = :{1} ORDER BY Tag ASC", + QueryType.GET_BATCH_TAGS.getParamName(0), + QueryType.GET_BATCH_TAGS.getParamName(1)); + + case REMOVE_BATCH_TAGS: + return MessageFormat.format( + "DELETE FROM BatchTagTable WHERE SignerId = :{0} AND BatchId = :{1}", + QueryType.REMOVE_BATCH_TAGS.getParamName(0), + QueryType.REMOVE_BATCH_TAGS.getParamName(1)); + default: throw new IllegalArgumentException("Cannot serve a query of type " + queryType); } @@ -52,21 +131,34 @@ public class SQLiteQueryProvider implements BulletinBoardSQLServer.SQLQueryProvi switch(filterType) { case EXACT_ENTRY: return "MsgTable.EntryNum = :EntryNum" + serialString; + case MAX_ENTRY: return "MsgTable.EntryNum <= :EntryNum" + serialString; + case MIN_ENTRY: return "MsgTable.EntryNum <= :EntryNum" + serialString; + case MAX_MESSAGES: return "LIMIT = :Limit" + serialString; + case MSG_ID: return "MsgTable.MsgId = :MsgId" + serialString; + case SIGNER_ID: return "EXISTS (SELECT 1 FROM SignatureTable" + " WHERE SignatureTable.SignerId = :SignerId" + serialString + " AND SignatureTable.EntryNum = MsgTable.EntryNum)"; + case TAG: return "EXISTS (SELECT 1 FROM TagTable" + " INNER JOIN MsgTagTable ON TagTable.TagId = MsgTagTable.TagId" + " WHERE TagTable.Tag = :Tag" + serialString + " AND MsgTagTable.EntryNum = MsgTable.EntryNum)"; + + case BEFORE_TIME: + return "MsgTable.ExactTime <= :TimeStamp"; + + case AFTER_TIME: + return "MsgTable.ExactTime >= :TimeStamp"; + default: throw new IllegalArgumentException("Cannot serve a filter of type " + filterType); } diff --git a/bulletin-board-server/src/test/java/meerkat/bulletinboard/GenericBulletinBoardServerTest.java b/bulletin-board-server/src/test/java/meerkat/bulletinboard/GenericBulletinBoardServerTest.java index e33c739..1e7921d 100644 --- a/bulletin-board-server/src/test/java/meerkat/bulletinboard/GenericBulletinBoardServerTest.java +++ b/bulletin-board-server/src/test/java/meerkat/bulletinboard/GenericBulletinBoardServerTest.java @@ -24,9 +24,12 @@ import meerkat.comm.CommunicationException; import meerkat.comm.MessageInputStream; import meerkat.comm.MessageOutputStream; import meerkat.comm.MessageInputStream.MessageInputStreamFactory; +import meerkat.crypto.Digest; import meerkat.crypto.concrete.ECDSASignature; +import meerkat.crypto.concrete.SHA256Digest; import meerkat.protobuf.BulletinBoardAPI.*; -import meerkat.util.BulletinBoardUtils; +import meerkat.util.BulletinBoardMessageGenerator; +import org.h2.util.DateTimeUtils; import static org.junit.Assert.*; import static org.hamcrest.CoreMatchers.*; @@ -59,6 +62,10 @@ public class GenericBulletinBoardServerTest { private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); // Used to time the tests + private BulletinBoardMessageGenerator bulletinBoardMessageGenerator; + + private Digest digest; + /** * @param bulletinBoardServer is an initialized server. * @throws InstantiationException @@ -121,8 +128,12 @@ public class GenericBulletinBoardServerTest { System.err.println("Couldn't find signing key " + e.getMessage()); fail("Couldn't find signing key " + e.getMessage()); } - - random = new Random(0); // We use insecure randomness in tests for repeatability + + // We use insecure randomness in tests for repeatability + random = new Random(0); + bulletinBoardMessageGenerator = new BulletinBoardMessageGenerator(random); + + digest = new SHA256Digest(); long end = threadBean.getCurrentThreadCpuTime(); System.err.println("Finished initializing GenericBulletinBoardServerTest"); @@ -182,7 +193,10 @@ public class GenericBulletinBoardServerTest { for (i = 1; i <= MESSAGE_NUM; i++) { unsignedMsgBuilder = UnsignedBulletinBoardMessage.newBuilder() .setData(ByteString.copyFrom(data[i - 1])) - .setTimestamp(BulletinBoardUtils.toTimestampProto()); + .setTimestamp(Timestamp.newBuilder() + .setSeconds(i) + .setNanos(i) + .build()); // Add tags based on bit-representation of message number. @@ -619,6 +633,72 @@ public class GenericBulletinBoardServerTest { } } + + public void testSyncQuery() + throws SignatureException, CommunicationException, IOException,NoSuchMethodException, IllegalAccessException, InvocationTargetException { + + Timestamp timestamp = Timestamp.newBuilder() + .setSeconds(1) + .setNanos(0) + .build(); + + BulletinBoardMessage newMessage = bulletinBoardMessageGenerator.generateRandomMessage(signers, timestamp, 10, 10); + + BoolMsg result = bulletinBoardServer.postMessage(newMessage); + assertThat("Failed to post message to BB Server", result.getValue(), is(true)); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + digest.update(newMessage.getMsg()); + + ByteString messageID = ByteString.copyFrom(digest.digest()); + + MessageFilterList filterList = MessageFilterList.newBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.MSG_ID) + .setId(messageID) + .build()) + .build(); + + bulletinBoardServer.readMessages(filterList, new MessageOutputStream(outputStream)); + + MessageInputStream inputStream = + MessageInputStreamFactory.createMessageInputStream(new ByteArrayInputStream( + outputStream.toByteArray()), + BulletinBoardMessage.class); + + long lastEntry = inputStream.asList().get(0).getEntryNum(); + + SyncQuery syncQuery = SyncQuery.newBuilder() + .setFilterList(MessageFilterList.getDefaultInstance()) + .addQuery(SingleSyncQuery.newBuilder() + .setChecksum(2) + .setTimeOfSync(Timestamp.newBuilder() + .setSeconds(2) + .setNanos(0) + .build()) + .build()) + .build(); + + SyncQueryResponse queryResponse = bulletinBoardServer.querySync(syncQuery); + + assertThat("Sync query replies with positive sync when no sync was expected", queryResponse.getLastEntryNum(), is(equalTo(-1l))); + + syncQuery = SyncQuery.newBuilder() + .setFilterList(MessageFilterList.getDefaultInstance()) + .addQuery(SingleSyncQuery.newBuilder() + .setChecksum(messageID.byteAt(0) & messageID.byteAt(1) & messageID.byteAt(2) & messageID.byteAt(3)) + .setTimeOfSync(timestamp) + .build()) + .build(); + + queryResponse = bulletinBoardServer.querySync(syncQuery); + + assertThat("Sync query reply contained wrong last entry number", lastEntry, is(equalTo(queryResponse.getLastEntryNum()))); + + assertThat("Sync query reply contained wrong timestamp", timestamp, is(equalTo(queryResponse.getLastTimeOfSync()))); + + } public void close(){ signers[0].clearSigningKey(); diff --git a/bulletin-board-server/src/test/java/meerkat/bulletinboard/MySQLBulletinBoardServerTest.java b/bulletin-board-server/src/test/java/meerkat/bulletinboard/MySQLBulletinBoardServerTest.java index 42c11f7..273a53f 100644 --- a/bulletin-board-server/src/test/java/meerkat/bulletinboard/MySQLBulletinBoardServerTest.java +++ b/bulletin-board-server/src/test/java/meerkat/bulletinboard/MySQLBulletinBoardServerTest.java @@ -8,8 +8,11 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.ThreadMXBean; +import java.lang.reflect.InvocationTargetException; +import java.security.SignatureException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; @@ -144,6 +147,16 @@ public class MySQLBulletinBoardServerTest { } + @Test + public void testSyncQuery() { + try { + serverTest.testSyncQuery(); + } catch (Exception e) { + System.err.println(e.getMessage()); + fail(e.getMessage()); + } + } + @After public void close() { System.err.println("Starting to close MySQLBulletinBoardServerTest"); diff --git a/meerkat-common/src/main/java/meerkat/comm/MessageInputStream.java b/meerkat-common/src/main/java/meerkat/comm/MessageInputStream.java index 33deb3f..dc637d9 100644 --- a/meerkat-common/src/main/java/meerkat/comm/MessageInputStream.java +++ b/meerkat-common/src/main/java/meerkat/comm/MessageInputStream.java @@ -5,6 +5,7 @@ import com.google.protobuf.Message; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.InvocationTargetException; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -12,7 +13,7 @@ import java.util.List; * Created by Arbel Deutsch Peled on 21-Feb-16. * A input stream of Protobuf messages */ -public class MessageInputStream{ +public class MessageInputStream implements Iterable{ private T.Builder builder; @@ -23,6 +24,33 @@ public class MessageInputStream{ this.builder = (T.Builder) type.getMethod("newBuilder").invoke(type); } + @Override + public Iterator iterator() { + + return new Iterator() { + + @Override + public boolean hasNext() { + try { + return isAvailable(); + } catch (IOException e) { + return false; + } + } + + @Override + public T next() { + try { + return readMessage(); + } catch (IOException e) { + return null; + } + } + + }; + + } + /** * Factory class for actually creating a MessageInputStream */ diff --git a/meerkat-common/src/main/java/meerkat/util/BulletinBoardUtils.java b/meerkat-common/src/main/java/meerkat/util/BulletinBoardUtils.java index d8b362c..8793b2d 100644 --- a/meerkat-common/src/main/java/meerkat/util/BulletinBoardUtils.java +++ b/meerkat-common/src/main/java/meerkat/util/BulletinBoardUtils.java @@ -80,7 +80,7 @@ public class BulletinBoardUtils { * This method creates a Timestamp Protobuf from the current system time * @return a Timestamp Protobuf encoding of the current system time */ - public static com.google.protobuf.Timestamp toTimestampProto() { + public static com.google.protobuf.Timestamp getCurrentTimestampProto() { return toTimestampProto(System.currentTimeMillis());