Local Client supports subsrciptions

Sync_Query
Arbel Deutsch Peled 2016-03-22 10:16:46 +02:00
parent a7699086d8
commit e56312d38b
4 changed files with 279 additions and 12 deletions

View File

@ -0,0 +1,231 @@
package meerkat.bulletinboard;
import com.google.common.util.concurrent.FutureCallback;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import meerkat.comm.CommunicationException;
import meerkat.crypto.concrete.ECDSASignature;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.util.BulletinBoardMessageComparator;
import meerkat.util.BulletinBoardMessageGenerator;
import java.io.IOException;
import java.io.InputStream;
import java.security.*;
import java.security.cert.CertificateException;
import java.util.*;
import java.util.concurrent.Semaphore;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/**
* Created by Arbel Deutsch Peled on 22-Mar-16.
*/
public class GenericSubscriptionClientTester {
private GenericBatchDigitalSignature signers[];
private ByteString[] signerIDs;
private static String KEYFILE_EXAMPLE = "/certs/enduser-certs/user1-key-with-password-secret.p12";
private static String KEYFILE_EXAMPLE3 = "/certs/enduser-certs/user3-key-with-password-shh.p12";
private static String KEYFILE_PASSWORD1 = "secret";
private static String KEYFILE_PASSWORD3 = "shh";
private static String CERT1_PEM_EXAMPLE = "/certs/enduser-certs/user1.crt";
private static String CERT3_PEM_EXAMPLE = "/certs/enduser-certs/user3.crt";
private SubscriptionAsyncBulletinBoardClient bulletinBoardClient;
private Random random;
private BulletinBoardMessageGenerator generator;
private Semaphore jobSemaphore;
private Vector<Throwable> thrown;
public GenericSubscriptionClientTester(SubscriptionAsyncBulletinBoardClient bulletinBoardClient){
this.bulletinBoardClient = bulletinBoardClient;
signers = new GenericBatchDigitalSignature[2];
signerIDs = new ByteString[signers.length];
signers[0] = new GenericBatchDigitalSignature(new ECDSASignature());
signers[1] = new GenericBatchDigitalSignature(new ECDSASignature());
InputStream keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE);
char[] password = KEYFILE_PASSWORD1.toCharArray();
KeyStore.Builder keyStoreBuilder;
try {
keyStoreBuilder = signers[0].getPKCS12KeyStoreBuilder(keyStream, password);
signers[0].loadSigningCertificate(keyStoreBuilder);
signers[0].loadVerificationCertificates(getClass().getResourceAsStream(CERT1_PEM_EXAMPLE));
keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE3);
password = KEYFILE_PASSWORD3.toCharArray();
keyStoreBuilder = signers[1].getPKCS12KeyStoreBuilder(keyStream, password);
signers[1].loadSigningCertificate(keyStoreBuilder);
signers[1].loadVerificationCertificates(getClass().getResourceAsStream(CERT3_PEM_EXAMPLE));
for (int i = 0 ; i < signers.length ; i++) {
signerIDs[i] = signers[i].getSignerID();
}
} catch (IOException e) {
System.err.println("Failed reading from signature file " + e.getMessage());
fail("Failed reading from signature file " + e.getMessage());
} catch (CertificateException e) {
System.err.println("Failed reading certificate " + e.getMessage());
fail("Failed reading certificate " + e.getMessage());
} catch (KeyStoreException e) {
System.err.println("Failed reading keystore " + e.getMessage());
fail("Failed reading keystore " + e.getMessage());
} catch (NoSuchAlgorithmException e) {
System.err.println("Couldn't find signing algorithm " + e.getMessage());
fail("Couldn't find signing algorithm " + e.getMessage());
} catch (UnrecoverableKeyException e) {
System.err.println("Couldn't find signing key " + e.getMessage());
fail("Couldn't find signing key " + e.getMessage());
}
}
/**
* Takes care of initializing the client and the test resources
*/
public void init(){
random = new Random(0); // We use insecure randomness in tests for repeatability
generator = new BulletinBoardMessageGenerator(random);
thrown = new Vector<>();
jobSemaphore = new Semaphore(0);
}
/**
* Closes the client and makes sure the test fails when an exception occurred in a separate thread
*/
public void close() {
if (thrown.size() > 0) {
assert false;
}
}
private class SubscriptionCallback implements FutureCallback<List<BulletinBoardMessage>>{
private int stage;
private final List<List<BulletinBoardMessage>> expectedMessages;
private final List<BulletinBoardMessage> messagesToPost;
private final BulletinBoardMessageComparator comparator;
public SubscriptionCallback(List<List<BulletinBoardMessage>> expectedMessages, List<BulletinBoardMessage> messagesToPost) {
this.expectedMessages = expectedMessages;
this.messagesToPost = messagesToPost;
this.stage = 0;
this.comparator = new BulletinBoardMessageComparator();
}
@Override
public void onSuccess(List<BulletinBoardMessage> result) {
if (stage >= expectedMessages.size())
return;
// Check for consistency
List<BulletinBoardMessage> expectedMsgList = expectedMessages.get(stage);
if (expectedMsgList.size() != result.size()){
onFailure(new AssertionError("Received wrong number of messages"));
return;
}
Iterator<BulletinBoardMessage> expectedMessageIterator = expectedMsgList.iterator();
Iterator<BulletinBoardMessage> receivedMessageIterator = result.iterator();
while (expectedMessageIterator.hasNext()) {
if(comparator.compare(expectedMessageIterator.next(), receivedMessageIterator.next()) != 0){
onFailure(new AssertionError("Received unexpected message"));
return;
}
}
// Post new message
try {
if (stage < messagesToPost.size()) {
bulletinBoardClient.postMessage(messagesToPost.get(stage));
}
} catch (CommunicationException e) {
onFailure(e);
return;
}
stage++;
jobSemaphore.release();
}
@Override
public void onFailure(Throwable t) {
System.err.println(t.getCause() + " " + t.getMessage());
thrown.add(t);
jobSemaphore.release(expectedMessages.size());
stage = expectedMessages.size();
}
}
public void subscriptionTest() throws SignatureException, CommunicationException {
final int FIRST_POST_ID = 201;
final int SECOND_POST_ID = 202;
final String COMMON_TAG = "SUBSCRIPTION_TEST";
List<String> tags = new LinkedList<>();
tags.add(COMMON_TAG);
BulletinBoardMessage msg1 = generator.generateRandomMessage(signers, Timestamp.newBuilder().setSeconds(1000).setNanos(900).build(), 10, 4, tags);
BulletinBoardMessage msg2 = generator.generateRandomMessage(signers, Timestamp.newBuilder().setSeconds(800).setNanos(300).build(), 10, 4);
BulletinBoardMessage msg3 = generator.generateRandomMessage(signers, Timestamp.newBuilder().setSeconds(2000).setNanos(0).build(), 10, 4, tags);
MessageFilterList filterList = MessageFilterList.newBuilder()
.addFilter(MessageFilter.newBuilder()
.setType(FilterType.TAG)
.setTag(COMMON_TAG)
.build())
.build();
List<List<BulletinBoardMessage>> expectedMessages = new ArrayList<>(3);
expectedMessages.add(new LinkedList<BulletinBoardMessage>());
expectedMessages.add(new LinkedList<BulletinBoardMessage>());
expectedMessages.add(new LinkedList<BulletinBoardMessage>());
expectedMessages.get(0).add(msg1);
expectedMessages.get(2).add(msg3);
List<BulletinBoardMessage> messagesToPost = new ArrayList<>(2);
messagesToPost.add(msg2);
messagesToPost.add(msg3);
bulletinBoardClient.postMessage(msg1);
bulletinBoardClient.subscribe(filterList, new SubscriptionCallback(expectedMessages, messagesToPost));
try {
jobSemaphore.acquire(3);
} catch (InterruptedException e) {
System.err.println(e.getCause() + " " + e.getMessage());
}
}
}

View File

@ -22,8 +22,11 @@ public class LocalBulletinBoardClientTest {
private static final int THREAD_NUM = 3;
private static final String DB_NAME = "TestDB";
// Tester
private static final int SUBSRCIPTION_DELAY = 3000;
// Testers
private GenericBulletinBoardClientTester clientTest;
private GenericSubscriptionClientTester subscriptionTester;
public LocalBulletinBoardClientTest() throws CommunicationException {
@ -48,8 +51,8 @@ public class LocalBulletinBoardClientTest {
BulletinBoardServer server = new BulletinBoardSQLServer(queryProvider);
server.init(DB_NAME);
LocalBulletinBoardClient client = new LocalBulletinBoardClient(server, THREAD_NUM);
LocalBulletinBoardClient client = new LocalBulletinBoardClient(server, THREAD_NUM, SUBSRCIPTION_DELAY);
subscriptionTester = new GenericSubscriptionClientTester(client);
clientTest = new GenericBulletinBoardClientTester(client);
}
@ -104,4 +107,12 @@ public class LocalBulletinBoardClientTest {
}
@Test
public void testSubscription() throws SignatureException, CommunicationException {
subscriptionTester.init();
subscriptionTester.subscriptionTest();
subscriptionTester.close();
}
}

View File

@ -158,10 +158,10 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider
+ " WHERE TagTable.Tag = :Tag" + serialString + " AND MsgTagTable.EntryNum = MsgTable.EntryNum)";
case BEFORE_TIME:
return "MsgTable.ExactTime <= :TimeStamp";
return "MsgTable.ExactTime <= :TimeStamp" + serialString;
case AFTER_TIME:
return "MsgTable.ExactTime >= :TimeStamp";
return "MsgTable.ExactTime >= :TimeStamp" + serialString;
default:
throw new IllegalArgumentException("Cannot serve a filter of type " + filterType);
@ -186,6 +186,11 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider
case TAG:
return "VARCHAR";
case AFTER_TIME: // Go through
case BEFORE_TIME:
return "TIMESTAMP";
default:
throw new IllegalArgumentException("Cannot serve a filter of type " + filterType);
}

View File

@ -8,6 +8,8 @@ import com.google.protobuf.Timestamp;
import java.math.BigInteger;
import java.security.SignatureException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
/**
@ -36,35 +38,36 @@ public class BulletinBoardMessageGenerator {
* @param timestamp contains the time used in the message
* @param dataSize is the length of the data contained in the message
* @param tagNumber is the number of tags to generate
* @param tags is a list of initial tags (on top of which more will be added according to the method input)
* @return a random, signed Bulletin Board Message containing random data and tags and the given timestamp
*/
public BulletinBoardMessage generateRandomMessage(DigitalSignature[] signers, Timestamp timestamp, int dataSize, int tagNumber)
throws SignatureException {
public BulletinBoardMessage generateRandomMessage(DigitalSignature[] signers, Timestamp timestamp, int dataSize, int tagNumber, List<String> tags)
throws SignatureException{
// Generate random data.
byte[] data = new byte[dataSize];
String[] tags = new String[tagNumber];
String[] newTags = new String[tagNumber];
for (int i = 0; i < dataSize; i++) {
data[i] = randomByte();
}
for (int i = 0; i < tagNumber; i++) {
tags[i] = randomString();
newTags[i] = randomString();
}
UnsignedBulletinBoardMessage unsignedMessage =
UnsignedBulletinBoardMessage.newBuilder()
.setData(ByteString.copyFrom(data))
.setTimestamp(timestamp)
.addAllTag(Arrays.asList(tags))
.addAllTag(tags)
.addAllTag(Arrays.asList(newTags))
.build();
BulletinBoardMessage.Builder messageBuilder =
BulletinBoardMessage.newBuilder()
.setMsg(unsignedMessage);
.setMsg(unsignedMessage);
for (int i = 0 ; i < signers.length ; i++) {
signers[i].updateContent(unsignedMessage);
@ -75,6 +78,23 @@ public class BulletinBoardMessageGenerator {
}
/**
* Generates a complete instance of a BulletinBoardMessage
* @param signers contains the (possibly multiple) credentials required to sign the message
* @param timestamp contains the time used in the message
* @param dataSize is the length of the data contained in the message
* @param tagNumber is the number of tags to generate
* @return a random, signed Bulletin Board Message containing random data and tags and the given timestamp
*/
public BulletinBoardMessage generateRandomMessage(DigitalSignature[] signers, Timestamp timestamp, int dataSize, int tagNumber)
throws SignatureException {
List<String> tags = new LinkedList<>();
return generateRandomMessage(signers, timestamp, dataSize, tagNumber, tags);
}
/**
* Generates a complete instance of a BulletinBoardMessage
* @param signers contains the (possibly multiple) credentials required to sign the message