import com.google.common.util.concurrent.FutureCallback; import com.google.protobuf.*; import com.google.protobuf.Timestamp; import meerkat.bulletinboard.AsyncBulletinBoardClient; import meerkat.bulletinboard.CompleteBatch; import meerkat.bulletinboard.GenericBatchDigitalSignature; import meerkat.bulletinboard.ThreadedBulletinBoardClient; import meerkat.comm.CommunicationException; import meerkat.crypto.concrete.ECDSASignature; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Crypto; import meerkat.protobuf.Voting.*; import meerkat.util.BulletinBoardMessageComparator; import org.junit.After; import org.junit.Before; import org.junit.Test; import static org.junit.Assert.*; import static org.hamcrest.CoreMatchers.*; import static org.hamcrest.number.OrderingComparison.*; import java.io.IOException; import java.io.InputStream; import java.security.*; import java.security.cert.CertificateException; import java.util.*; import java.util.concurrent.Semaphore; /** * Created by Arbel Deutsch Peled on 05-Dec-15. */ public class ThreadedBulletinBoardClientIntegrationTest { // Signature resources private GenericBatchDigitalSignature signers[]; private ByteString[] signerIDs; private static String KEYFILE_EXAMPLE = "/certs/enduser-certs/user1-key-with-password-secret.p12"; private static String KEYFILE_EXAMPLE3 = "/certs/enduser-certs/user3-key-with-password-shh.p12"; private static String KEYFILE_PASSWORD1 = "secret"; private static String KEYFILE_PASSWORD3 = "shh"; private static String CERT1_PEM_EXAMPLE = "/certs/enduser-certs/user1.crt"; private static String CERT3_PEM_EXAMPLE = "/certs/enduser-certs/user3.crt"; // Server data private static String PROP_GETTY_URL = "gretty.httpBaseURI"; private static String DEFAULT_BASE_URL = "http://localhost:8081"; private static String BASE_URL = System.getProperty(PROP_GETTY_URL, DEFAULT_BASE_URL); // Client and callbacks private AsyncBulletinBoardClient bulletinBoardClient; private PostCallback postCallback; private PostCallback failPostCallback = new PostCallback(true,false); private RedundancyCallback redundancyCallback; private ReadCallback readCallback; private ReadBatchCallback readBatchCallback; // Sync and misc private Semaphore jobSemaphore; private Vector thrown; private Random random; // Constructor public ThreadedBulletinBoardClientIntegrationTest(){ signers = new GenericBatchDigitalSignature[2]; signerIDs = new ByteString[signers.length]; signers[0] = new GenericBatchDigitalSignature(new ECDSASignature()); signers[1] = new GenericBatchDigitalSignature(new ECDSASignature()); InputStream keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE); char[] password = KEYFILE_PASSWORD1.toCharArray(); KeyStore.Builder keyStoreBuilder; try { keyStoreBuilder = signers[0].getPKCS12KeyStoreBuilder(keyStream, password); signers[0].loadSigningCertificate(keyStoreBuilder); signers[0].loadVerificationCertificates(getClass().getResourceAsStream(CERT1_PEM_EXAMPLE)); keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE3); password = KEYFILE_PASSWORD3.toCharArray(); keyStoreBuilder = signers[1].getPKCS12KeyStoreBuilder(keyStream, password); signers[1].loadSigningCertificate(keyStoreBuilder); signers[1].loadVerificationCertificates(getClass().getResourceAsStream(CERT3_PEM_EXAMPLE)); for (int i = 0 ; i < signers.length ; i++) { signerIDs[i] = signers[i].getSignerID(); } } catch (IOException e) { System.err.println("Failed reading from signature file " + e.getMessage()); fail("Failed reading from signature file " + e.getMessage()); } catch (CertificateException e) { System.err.println("Failed reading certificate " + e.getMessage()); fail("Failed reading certificate " + e.getMessage()); } catch (KeyStoreException e) { System.err.println("Failed reading keystore " + e.getMessage()); fail("Failed reading keystore " + e.getMessage()); } catch (NoSuchAlgorithmException e) { System.err.println("Couldn't find signing algorithm " + e.getMessage()); fail("Couldn't find signing algorithm " + e.getMessage()); } catch (UnrecoverableKeyException e) { System.err.println("Couldn't find signing key " + e.getMessage()); fail("Couldn't find signing key " + e.getMessage()); } } // Callback definitions protected void genericHandleFailure(Throwable t){ System.err.println(t.getCause() + " " + t.getMessage()); thrown.add(t); jobSemaphore.release(); } private class PostCallback implements FutureCallback{ private boolean isAssert; private boolean assertValue; public PostCallback() { this(false); } public PostCallback(boolean isAssert) { this(isAssert,true); } public PostCallback(boolean isAssert, boolean assertValue) { this.isAssert = isAssert; this.assertValue = assertValue; } @Override public void onSuccess(Boolean msg) { System.err.println("Post operation completed"); jobSemaphore.release(); //TODO: Change Assert mechanism to exception one if (isAssert) { if (assertValue) { assertThat("Post operation failed", msg, is(Boolean.TRUE)); } else { assertThat("Post operation succeeded unexpectedly", msg, is(Boolean.FALSE)); } } } @Override public void onFailure(Throwable t) { genericHandleFailure(t); } } private class RedundancyCallback implements FutureCallback{ private float minRedundancy; public RedundancyCallback(float minRedundancy) { this.minRedundancy = minRedundancy; } @Override public void onSuccess(Float redundancy) { System.err.println("Redundancy found is: " + redundancy); jobSemaphore.release(); assertThat(redundancy, greaterThanOrEqualTo(minRedundancy)); } @Override public void onFailure(Throwable t) { genericHandleFailure(t); } } private class ReadCallback implements FutureCallback>{ private List expectedMsgList; public ReadCallback(List expectedMsgList) { this.expectedMsgList = expectedMsgList; } @Override public void onSuccess(List messages) { System.err.println(messages); jobSemaphore.release(); BulletinBoardMessageComparator msgComparator = new BulletinBoardMessageComparator(); assertThat(messages.size(), is(expectedMsgList.size())); Iterator expectedMessageIterator = expectedMsgList.iterator(); Iterator receivedMessageIterator = messages.iterator(); while (expectedMessageIterator.hasNext()) { assertThat(msgComparator.compare(expectedMessageIterator.next(), receivedMessageIterator.next()), is(0)); } } @Override public void onFailure(Throwable t) { genericHandleFailure(t); } } private class ReadBatchCallback implements FutureCallback { private CompleteBatch expectedBatch; public ReadBatchCallback(CompleteBatch expectedBatch) { this.expectedBatch = expectedBatch; } @Override public void onSuccess(CompleteBatch batch) { System.err.println(batch); jobSemaphore.release(); assertThat("Batch returned is incorrect", batch, is(equalTo(expectedBatch))); } @Override public void onFailure(Throwable t) { genericHandleFailure(t); } } // Randomness generators private byte randomByte(){ return (byte) random.nextInt(); } private byte[] randomByteArray(int length) { byte[] randomBytes = new byte[length]; for (int i = 0; i < length ; i++){ randomBytes[i] = randomByte(); } return randomBytes; } private CompleteBatch createRandomBatch(int signer, int batchId, int length) throws SignatureException { CompleteBatch completeBatch = new CompleteBatch(); // Create data completeBatch.setBeginBatchMessage(BeginBatchMessage.newBuilder() .setSignerId(signerIDs[signer]) .setBatchId(batchId) .addTag("Test") .build()); for (int i = 0 ; i < length ; i++){ BatchData batchData = BatchData.newBuilder() .setData(ByteString.copyFrom(randomByteArray(i))) .build(); completeBatch.appendBatchData(batchData); } completeBatch.setTimestamp(Timestamp.newBuilder() .setSeconds(Math.abs(90)) .setNanos(50) .build()); signers[signer].updateContent(completeBatch); completeBatch.setSignature(signers[signer].sign()); return completeBatch; } // Test methods /** * Takes care of initializing the client and the test resources */ @Before public void init(){ bulletinBoardClient = new ThreadedBulletinBoardClient(); random = new Random(0); // We use insecure randomness in tests for repeatability List testDB = new LinkedList<>(); testDB.add(BASE_URL); bulletinBoardClient.init(BulletinBoardClientParams.newBuilder() .addAllBulletinBoardAddress(testDB) .setMinRedundancy((float) 1.0) .build()); postCallback = new PostCallback(); redundancyCallback = new RedundancyCallback((float) 1.0); thrown = new Vector<>(); jobSemaphore = new Semaphore(0); } /** * Closes the client and makes sure the test fails when an exception occurred in a separate thread */ @After public void close() { bulletinBoardClient.close(); if (thrown.size() > 0) { assert false; } } /** * Tests the standard post, redundancy and read methods */ @Test public void postTest() { byte[] b1 = {(byte) 1, (byte) 2, (byte) 3, (byte) 4}; byte[] b2 = {(byte) 11, (byte) 12, (byte) 13, (byte) 14}; byte[] b3 = {(byte) 21, (byte) 22, (byte) 23, (byte) 24}; BulletinBoardMessage msg; MessageFilterList filterList; List msgList; MessageID messageID; msg = BulletinBoardMessage.newBuilder() .setMsg(UnsignedBulletinBoardMessage.newBuilder() .addTag("Signature") .addTag("Trustee") .setData(ByteString.copyFrom(b1)) .setTimestamp(Timestamp.newBuilder() .setSeconds(20) .setNanos(30) .build()) .build()) .addSig(Crypto.Signature.newBuilder() .setType(Crypto.SignatureType.DSA) .setData(ByteString.copyFrom(b2)) .setSignerId(ByteString.copyFrom(b3)) .build()) .addSig(Crypto.Signature.newBuilder() .setType(Crypto.SignatureType.ECDSA) .setData(ByteString.copyFrom(b3)) .setSignerId(ByteString.copyFrom(b2)) .build()) .build(); messageID = bulletinBoardClient.postMessage(msg,postCallback); try { jobSemaphore.acquire(); } catch (InterruptedException e) { System.err.println(e.getCause() + " " + e.getMessage()); } bulletinBoardClient.getRedundancy(messageID,redundancyCallback); filterList = MessageFilterList.newBuilder() .addFilter( MessageFilter.newBuilder() .setType(FilterType.TAG) .setTag("Signature") .build() ) .addFilter( MessageFilter.newBuilder() .setType(FilterType.TAG) .setTag("Trustee") .build() ) .build(); msgList = new LinkedList<>(); msgList.add(msg); readCallback = new ReadCallback(msgList); bulletinBoardClient.readMessages(filterList, readCallback); try { jobSemaphore.acquire(2); } catch (InterruptedException e) { System.err.println(e.getCause() + " " + e.getMessage()); } } /** * Tests posting a batch by parts * Also tests not being able to post to a closed batch * @throws CommunicationException, SignatureException, InterruptedException */ @Test public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException { final int SIGNER = 1; final int BATCH_ID = 100; final int BATCH_LENGTH = 100; CompleteBatch completeBatch = createRandomBatch(SIGNER, BATCH_ID, BATCH_LENGTH); // Begin batch bulletinBoardClient.beginBatch(completeBatch.getBeginBatchMessage(), postCallback); jobSemaphore.acquire(); // Post data bulletinBoardClient.postBatchData(signerIDs[SIGNER], BATCH_ID, completeBatch.getBatchDataList(), postCallback); jobSemaphore.acquire(); // Close batch CloseBatchMessage closeBatchMessage = CloseBatchMessage.newBuilder() .setBatchId(BATCH_ID) .setBatchLength(BATCH_LENGTH) .setTimestamp(Timestamp.newBuilder() .setSeconds(50) .setNanos(80) .build()) .setSig(completeBatch.getSignature()) .build(); bulletinBoardClient.closeBatch(closeBatchMessage, postCallback); jobSemaphore.acquire(); // Attempt to open batch again bulletinBoardClient.beginBatch(completeBatch.getBeginBatchMessage(), failPostCallback); // Attempt to add batch data bulletinBoardClient.postBatchData(signerIDs[SIGNER], BATCH_ID, completeBatch.getBatchDataList(), failPostCallback); jobSemaphore.acquire(2); // Read batch data BatchSpecificationMessage batchSpecificationMessage = BatchSpecificationMessage.newBuilder() .setSignerId(signerIDs[SIGNER]) .setBatchId(BATCH_ID) .setStartPosition(0) .build(); readBatchCallback = new ReadBatchCallback(completeBatch); bulletinBoardClient.readBatch(batchSpecificationMessage, readBatchCallback); jobSemaphore.acquire(); } /** * Posts a complete batch message * Checks reading od the message * @throws CommunicationException, SignatureException, InterruptedException */ @Test public void testCompleteBatchPost() throws CommunicationException, SignatureException, InterruptedException { final int SIGNER = 0; final int BATCH_ID = 101; final int BATCH_LENGTH = 50; // Post batch CompleteBatch completeBatch = createRandomBatch(SIGNER, BATCH_ID, BATCH_LENGTH); bulletinBoardClient.postBatch(completeBatch,postCallback); jobSemaphore.acquire(); // Read batch BatchSpecificationMessage batchSpecificationMessage = BatchSpecificationMessage.newBuilder() .setSignerId(signerIDs[SIGNER]) .setBatchId(BATCH_ID) .setStartPosition(0) .build(); readBatchCallback = new ReadBatchCallback(completeBatch); bulletinBoardClient.readBatch(batchSpecificationMessage, readBatchCallback); jobSemaphore.acquire(); } /** * Tests that an unopened batch cannot be closed * @throws CommunicationException, InterruptedException */ @Test public void testInvalidBatchClose() throws CommunicationException, InterruptedException { final int NON_EXISTENT_BATCH_ID = 999; CloseBatchMessage closeBatchMessage = CloseBatchMessage.newBuilder() .setBatchId(NON_EXISTENT_BATCH_ID) .setBatchLength(1) .setSig(Crypto.Signature.getDefaultInstance()) .setTimestamp(Timestamp.newBuilder() .setSeconds(9) .setNanos(12) .build()) .build(); // Try to close the (unopened) batch; bulletinBoardClient.closeBatch(closeBatchMessage, failPostCallback); jobSemaphore.acquire(); } }