From e56312d38bae20ae1a02726f23eeef9d222eb02b Mon Sep 17 00:00:00 2001 From: Arbel Deutsch Peled Date: Tue, 22 Mar 2016 10:16:46 +0200 Subject: [PATCH] Local Client supports subsrciptions --- .../GenericSubscriptionClientTester.java | 231 ++++++++++++++++++ .../LocalBulletinBoardClientTest.java | 17 +- .../sqlserver/H2QueryProvider.java | 9 +- .../util/BulletinBoardMessageGenerator.java | 34 ++- 4 files changed, 279 insertions(+), 12 deletions(-) create mode 100644 bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java new file mode 100644 index 0000000..a91f8d6 --- /dev/null +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericSubscriptionClientTester.java @@ -0,0 +1,231 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import meerkat.comm.CommunicationException; +import meerkat.crypto.concrete.ECDSASignature; +import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.util.BulletinBoardMessageComparator; +import meerkat.util.BulletinBoardMessageGenerator; + +import java.io.IOException; +import java.io.InputStream; +import java.security.*; +import java.security.cert.CertificateException; +import java.util.*; +import java.util.concurrent.Semaphore; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.startsWith; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Created by Arbel Deutsch Peled on 22-Mar-16. + */ +public class GenericSubscriptionClientTester { + + private GenericBatchDigitalSignature signers[]; + private ByteString[] signerIDs; + + private static String KEYFILE_EXAMPLE = "/certs/enduser-certs/user1-key-with-password-secret.p12"; + private static String KEYFILE_EXAMPLE3 = "/certs/enduser-certs/user3-key-with-password-shh.p12"; + + private static String KEYFILE_PASSWORD1 = "secret"; + private static String KEYFILE_PASSWORD3 = "shh"; + + private static String CERT1_PEM_EXAMPLE = "/certs/enduser-certs/user1.crt"; + private static String CERT3_PEM_EXAMPLE = "/certs/enduser-certs/user3.crt"; + + private SubscriptionAsyncBulletinBoardClient bulletinBoardClient; + + private Random random; + private BulletinBoardMessageGenerator generator; + + private Semaphore jobSemaphore; + private Vector thrown; + + public GenericSubscriptionClientTester(SubscriptionAsyncBulletinBoardClient bulletinBoardClient){ + + this.bulletinBoardClient = bulletinBoardClient; + + signers = new GenericBatchDigitalSignature[2]; + signerIDs = new ByteString[signers.length]; + signers[0] = new GenericBatchDigitalSignature(new ECDSASignature()); + signers[1] = new GenericBatchDigitalSignature(new ECDSASignature()); + + InputStream keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE); + char[] password = KEYFILE_PASSWORD1.toCharArray(); + + KeyStore.Builder keyStoreBuilder; + try { + keyStoreBuilder = signers[0].getPKCS12KeyStoreBuilder(keyStream, password); + + signers[0].loadSigningCertificate(keyStoreBuilder); + + signers[0].loadVerificationCertificates(getClass().getResourceAsStream(CERT1_PEM_EXAMPLE)); + + keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE3); + password = KEYFILE_PASSWORD3.toCharArray(); + + keyStoreBuilder = signers[1].getPKCS12KeyStoreBuilder(keyStream, password); + signers[1].loadSigningCertificate(keyStoreBuilder); + + signers[1].loadVerificationCertificates(getClass().getResourceAsStream(CERT3_PEM_EXAMPLE)); + + for (int i = 0 ; i < signers.length ; i++) { + signerIDs[i] = signers[i].getSignerID(); + } + + } catch (IOException e) { + System.err.println("Failed reading from signature file " + e.getMessage()); + fail("Failed reading from signature file " + e.getMessage()); + } catch (CertificateException e) { + System.err.println("Failed reading certificate " + e.getMessage()); + fail("Failed reading certificate " + e.getMessage()); + } catch (KeyStoreException e) { + System.err.println("Failed reading keystore " + e.getMessage()); + fail("Failed reading keystore " + e.getMessage()); + } catch (NoSuchAlgorithmException e) { + System.err.println("Couldn't find signing algorithm " + e.getMessage()); + fail("Couldn't find signing algorithm " + e.getMessage()); + } catch (UnrecoverableKeyException e) { + System.err.println("Couldn't find signing key " + e.getMessage()); + fail("Couldn't find signing key " + e.getMessage()); + } + + } + + /** + * Takes care of initializing the client and the test resources + */ + public void init(){ + + random = new Random(0); // We use insecure randomness in tests for repeatability + generator = new BulletinBoardMessageGenerator(random); + + thrown = new Vector<>(); + jobSemaphore = new Semaphore(0); + + } + + /** + * Closes the client and makes sure the test fails when an exception occurred in a separate thread + */ + + public void close() { + + if (thrown.size() > 0) { + assert false; + } + + } + + private class SubscriptionCallback implements FutureCallback>{ + + private int stage; + private final List> expectedMessages; + private final List messagesToPost; + private final BulletinBoardMessageComparator comparator; + + public SubscriptionCallback(List> expectedMessages, List messagesToPost) { + + this.expectedMessages = expectedMessages; + this.messagesToPost = messagesToPost; + this.stage = 0; + this.comparator = new BulletinBoardMessageComparator(); + + } + + @Override + public void onSuccess(List result) { + + if (stage >= expectedMessages.size()) + return; + + // Check for consistency + + List expectedMsgList = expectedMessages.get(stage); + + if (expectedMsgList.size() != result.size()){ + onFailure(new AssertionError("Received wrong number of messages")); + return; + } + + Iterator expectedMessageIterator = expectedMsgList.iterator(); + Iterator receivedMessageIterator = result.iterator(); + + while (expectedMessageIterator.hasNext()) { + if(comparator.compare(expectedMessageIterator.next(), receivedMessageIterator.next()) != 0){ + onFailure(new AssertionError("Received unexpected message")); + return; + } + } + + // Post new message + try { + if (stage < messagesToPost.size()) { + bulletinBoardClient.postMessage(messagesToPost.get(stage)); + } + } catch (CommunicationException e) { + onFailure(e); + return; + } + + stage++; + jobSemaphore.release(); + } + + @Override + public void onFailure(Throwable t) { + System.err.println(t.getCause() + " " + t.getMessage()); + thrown.add(t); + jobSemaphore.release(expectedMessages.size()); + stage = expectedMessages.size(); + } + } + + public void subscriptionTest() throws SignatureException, CommunicationException { + + final int FIRST_POST_ID = 201; + final int SECOND_POST_ID = 202; + final String COMMON_TAG = "SUBSCRIPTION_TEST"; + + List tags = new LinkedList<>(); + tags.add(COMMON_TAG); + + BulletinBoardMessage msg1 = generator.generateRandomMessage(signers, Timestamp.newBuilder().setSeconds(1000).setNanos(900).build(), 10, 4, tags); + BulletinBoardMessage msg2 = generator.generateRandomMessage(signers, Timestamp.newBuilder().setSeconds(800).setNanos(300).build(), 10, 4); + BulletinBoardMessage msg3 = generator.generateRandomMessage(signers, Timestamp.newBuilder().setSeconds(2000).setNanos(0).build(), 10, 4, tags); + + MessageFilterList filterList = MessageFilterList.newBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.TAG) + .setTag(COMMON_TAG) + .build()) + .build(); + + List> expectedMessages = new ArrayList<>(3); + expectedMessages.add(new LinkedList()); + expectedMessages.add(new LinkedList()); + expectedMessages.add(new LinkedList()); + expectedMessages.get(0).add(msg1); + expectedMessages.get(2).add(msg3); + + List messagesToPost = new ArrayList<>(2); + messagesToPost.add(msg2); + messagesToPost.add(msg3); + + bulletinBoardClient.postMessage(msg1); + bulletinBoardClient.subscribe(filterList, new SubscriptionCallback(expectedMessages, messagesToPost)); + + try { + jobSemaphore.acquire(3); + } catch (InterruptedException e) { + System.err.println(e.getCause() + " " + e.getMessage()); + } + + } + +} diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java index 2e0e0af..d2039ae 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/LocalBulletinBoardClientTest.java @@ -22,8 +22,11 @@ public class LocalBulletinBoardClientTest { private static final int THREAD_NUM = 3; private static final String DB_NAME = "TestDB"; - // Tester + private static final int SUBSRCIPTION_DELAY = 3000; + + // Testers private GenericBulletinBoardClientTester clientTest; + private GenericSubscriptionClientTester subscriptionTester; public LocalBulletinBoardClientTest() throws CommunicationException { @@ -48,8 +51,8 @@ public class LocalBulletinBoardClientTest { BulletinBoardServer server = new BulletinBoardSQLServer(queryProvider); server.init(DB_NAME); - LocalBulletinBoardClient client = new LocalBulletinBoardClient(server, THREAD_NUM); - + LocalBulletinBoardClient client = new LocalBulletinBoardClient(server, THREAD_NUM, SUBSRCIPTION_DELAY); + subscriptionTester = new GenericSubscriptionClientTester(client); clientTest = new GenericBulletinBoardClientTester(client); } @@ -104,4 +107,12 @@ public class LocalBulletinBoardClientTest { } + @Test + public void testSubscription() throws SignatureException, CommunicationException { + subscriptionTester.init(); + subscriptionTester.subscriptionTest(); + subscriptionTester.close(); + + } + } diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java index c390048..a54c2ff 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/H2QueryProvider.java @@ -158,10 +158,10 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider + " WHERE TagTable.Tag = :Tag" + serialString + " AND MsgTagTable.EntryNum = MsgTable.EntryNum)"; case BEFORE_TIME: - return "MsgTable.ExactTime <= :TimeStamp"; + return "MsgTable.ExactTime <= :TimeStamp" + serialString; case AFTER_TIME: - return "MsgTable.ExactTime >= :TimeStamp"; + return "MsgTable.ExactTime >= :TimeStamp" + serialString; default: throw new IllegalArgumentException("Cannot serve a filter of type " + filterType); @@ -186,6 +186,11 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider case TAG: return "VARCHAR"; + case AFTER_TIME: // Go through + case BEFORE_TIME: + return "TIMESTAMP"; + + default: throw new IllegalArgumentException("Cannot serve a filter of type " + filterType); } diff --git a/meerkat-common/src/main/java/meerkat/util/BulletinBoardMessageGenerator.java b/meerkat-common/src/main/java/meerkat/util/BulletinBoardMessageGenerator.java index 5ca3e0b..dff562e 100644 --- a/meerkat-common/src/main/java/meerkat/util/BulletinBoardMessageGenerator.java +++ b/meerkat-common/src/main/java/meerkat/util/BulletinBoardMessageGenerator.java @@ -8,6 +8,8 @@ import com.google.protobuf.Timestamp; import java.math.BigInteger; import java.security.SignatureException; import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; import java.util.Random; /** @@ -36,35 +38,36 @@ public class BulletinBoardMessageGenerator { * @param timestamp contains the time used in the message * @param dataSize is the length of the data contained in the message * @param tagNumber is the number of tags to generate + * @param tags is a list of initial tags (on top of which more will be added according to the method input) * @return a random, signed Bulletin Board Message containing random data and tags and the given timestamp */ - - public BulletinBoardMessage generateRandomMessage(DigitalSignature[] signers, Timestamp timestamp, int dataSize, int tagNumber) - throws SignatureException { + public BulletinBoardMessage generateRandomMessage(DigitalSignature[] signers, Timestamp timestamp, int dataSize, int tagNumber, List tags) + throws SignatureException{ // Generate random data. byte[] data = new byte[dataSize]; - String[] tags = new String[tagNumber]; + String[] newTags = new String[tagNumber]; for (int i = 0; i < dataSize; i++) { data[i] = randomByte(); } for (int i = 0; i < tagNumber; i++) { - tags[i] = randomString(); + newTags[i] = randomString(); } UnsignedBulletinBoardMessage unsignedMessage = UnsignedBulletinBoardMessage.newBuilder() .setData(ByteString.copyFrom(data)) .setTimestamp(timestamp) - .addAllTag(Arrays.asList(tags)) + .addAllTag(tags) + .addAllTag(Arrays.asList(newTags)) .build(); BulletinBoardMessage.Builder messageBuilder = BulletinBoardMessage.newBuilder() - .setMsg(unsignedMessage); + .setMsg(unsignedMessage); for (int i = 0 ; i < signers.length ; i++) { signers[i].updateContent(unsignedMessage); @@ -75,6 +78,23 @@ public class BulletinBoardMessageGenerator { } + /** + * Generates a complete instance of a BulletinBoardMessage + * @param signers contains the (possibly multiple) credentials required to sign the message + * @param timestamp contains the time used in the message + * @param dataSize is the length of the data contained in the message + * @param tagNumber is the number of tags to generate + * @return a random, signed Bulletin Board Message containing random data and tags and the given timestamp + */ + + public BulletinBoardMessage generateRandomMessage(DigitalSignature[] signers, Timestamp timestamp, int dataSize, int tagNumber) + throws SignatureException { + + List tags = new LinkedList<>(); + return generateRandomMessage(signers, timestamp, dataSize, tagNumber, tags); + + } + /** * Generates a complete instance of a BulletinBoardMessage * @param signers contains the (possibly multiple) credentials required to sign the message