diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientWorker.java index eb67672..9ce5ef4 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientWorker.java @@ -77,52 +77,52 @@ public class BulletinClientWorker implements Callable { WebTarget webTarget; Response response; - job.shuffleAddresses(); // This is done to randomize the order of access to servers primarily for READ operations - String requestPath; Message msg; + Message payload = job.getPayload(); + BulletinBoardMessageList msgList; int count = 0; // Used to count number of servers which contain the required message in a GET_REDUNDANCY request. + job.shuffleAddresses(); // This is done to randomize the order of access to servers primarily for READ operations + // Prepare the request. switch(job.getJobType()) { case POST_MESSAGE: // Make sure the payload is a BulletinBoardMessage - if (!(job.getPayload() instanceof BulletinBoardMessage)) { + if (!(payload instanceof BulletinBoardMessage)) { throw new IllegalArgumentException("Cannot post an object that is not an instance of BulletinBoardMessage"); } - msg = job.getPayload(); + msg = payload; requestPath = Constants.POST_MESSAGE_PATH; break; case READ_MESSAGES: // Make sure the payload is a MessageFilterList - if (!(job.getPayload() instanceof MessageFilterList)) { + if (!(payload instanceof MessageFilterList)) { throw new IllegalArgumentException("Read failed: an instance of MessageFilterList is required as payload for a READ_MESSAGES operation"); } - msg = job.getPayload(); + msg = payload; requestPath = Constants.READ_MESSAGES_PATH; break; case GET_REDUNDANCY: - // Make sure the payload is a BulletinBoardMessage - if (!(job.getPayload() instanceof BulletinBoardMessage)) { + // Make sure the payload is a MessageId + if (!(payload instanceof MessageID)) { throw new IllegalArgumentException("Cannot search for an object that is not an instance of BulletinBoardMessage"); } requestPath = Constants.READ_MESSAGES_PATH; - // Create a MsgID from the - digest.update((BulletinBoardMessage) job.getPayload()); msg = MessageFilterList.newBuilder() .addFilter(MessageFilter.newBuilder() .setType(FilterType.MSG_ID) - .setId(ByteString.copyFrom(digest.digest())) + .setId(payload.toByteString()) .build() ).build(); diff --git a/bulletin-board-client/src/test/java/BulletinBoardClientIntegrationTest.java b/bulletin-board-client/src/test/java/BulletinBoardClientIntegrationTest.java index f9699c1..b50d1e5 100644 --- a/bulletin-board-client/src/test/java/BulletinBoardClientIntegrationTest.java +++ b/bulletin-board-client/src/test/java/BulletinBoardClientIntegrationTest.java @@ -1,17 +1,24 @@ import com.google.protobuf.ByteString; +import com.google.protobuf.Message; import meerkat.bulletinboard.BulletinBoardClient; -import meerkat.bulletinboard.SimpleBulletinBoardClient; -import meerkat.comm.CommunicationException; +import meerkat.bulletinboard.BulletinBoardClient.ClientCallback; +import meerkat.bulletinboard.ThreadedBulletinBoardClient; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Crypto; +import meerkat.protobuf.Voting.*; import meerkat.util.BulletinBoardMessageComparator; + import org.junit.Before; import org.junit.Test; + +import static java.lang.Thread.sleep; import static org.junit.Assert.*; import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.number.OrderingComparison.*; import java.util.Comparator; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -20,8 +27,74 @@ import java.util.List; */ public class BulletinBoardClientIntegrationTest { + private class PostCallback implements ClientCallback{ + + @Override + public void handleCallback(Object msg) {} + + @Override + public void handleFailure(Throwable t) { + System.err.println(t.getCause() + "\n" + t.getMessage()); + assert false; + } + } + + private class RedundancyCallback implements ClientCallback{ + + private float minRedundancy; + + public RedundancyCallback(float minRedundancy) { + this.minRedundancy = minRedundancy; + } + + @Override + public void handleCallback(Float redundancy) { + assertThat(redundancy, greaterThanOrEqualTo(minRedundancy)); + } + + @Override + public void handleFailure(Throwable t) { + System.err.println(t.getCause() + "\n" + t.getMessage()); + assert false; + } + } + + private class ReadCallback implements ClientCallback>{ + + private List expectedMsgList; + + public ReadCallback(List expectedMsgList) { + this.expectedMsgList = expectedMsgList; + } + + @Override + public void handleCallback(List messages) { + BulletinBoardMessageComparator msgComparator = new BulletinBoardMessageComparator(); + + assertThat(messages.size(), is(expectedMsgList.size())); + + Iterator expectedMessageIterator = expectedMsgList.iterator(); + Iterator receivedMessageIterator = messages.iterator(); + + while (expectedMessageIterator.hasNext()) { + assertThat(msgComparator.compare(expectedMessageIterator.next(), receivedMessageIterator.next()), is(0)); + } + + } + + @Override + public void handleFailure(Throwable t) { + System.err.println(t.getCause() + "\n" + t.getMessage()); + assert false; + } + } + private BulletinBoardClient bulletinBoardClient; + private PostCallback postCallback; + private RedundancyCallback redundancyCallback; + private ReadCallback readCallback; + private static String PROP_GETTY_URL = "gretty.httpBaseURI"; private static String DEFAULT_BASE_URL = "http://localhost:8081"; private static String BASE_URL = System.getProperty(PROP_GETTY_URL, DEFAULT_BASE_URL); @@ -29,17 +102,23 @@ public class BulletinBoardClientIntegrationTest { @Before public void init(){ - bulletinBoardClient = new SimpleBulletinBoardClient(); + bulletinBoardClient = new ThreadedBulletinBoardClient(); List testDB = new LinkedList(); testDB.add(BASE_URL); - bulletinBoardClient.init(testDB); + bulletinBoardClient.init(BulletinBoardClientParams.newBuilder() + .addBulletinBoardAddress("http://localhost:8081") + .setMinRedundancy((float) 1.0) + .build()); + + postCallback = new PostCallback(); + redundancyCallback = new RedundancyCallback((float) 1.0); } @Test - public void postTest(){ + public void postTest() { byte[] b1 = {(byte) 1, (byte) 2, (byte) 3, (byte) 4}; byte[] b2 = {(byte) 11, (byte) 12, (byte) 13, (byte) 14}; @@ -73,15 +152,15 @@ public class BulletinBoardClientIntegrationTest { .build()) .build(); + messageID = bulletinBoardClient.postMessage(msg,postCallback); + try { - messageID = bulletinBoardClient.postMessage(msg); - } catch (CommunicationException e) { - System.err.println("Error posting to BB Server: " + e.getMessage()); - assert false; - return; + sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); } - assertThat(bulletinBoardClient.getRedundancy(messageID), is((float) 1.00)); + bulletinBoardClient.getRedundancy(messageID,redundancyCallback); filterList = MessageFilterList.newBuilder() .addFilter( @@ -90,19 +169,20 @@ public class BulletinBoardClientIntegrationTest { .setTag("Signature") .build() ) -// .addFilter( -// MessageFilter.newBuilder() -// .setType(FilterType.TAG) -// .setTag("Trustee") -// .build() -// ) + .addFilter( + MessageFilter.newBuilder() + .setType(FilterType.TAG) + .setTag("Trustee") + .build() + ) .build(); - msgList = bulletinBoardClient.readMessages(filterList); + msgList = new LinkedList(); + msgList.add(msg); - assertThat(msgList.size(), is(1)); + readCallback = new ReadCallback(msgList); - assertThat(msgComparator.compare(msgList.iterator().next(), msg), is(0)); + bulletinBoardClient.readMessages(filterList, readCallback); }