From 4f2d0e77388312a3761fa9bc4c93d3071e33c6cc Mon Sep 17 00:00:00 2001 From: Arbel Deutsch Peled Date: Sat, 12 Dec 2015 22:45:31 +0200 Subject: [PATCH] First working version of Threaded Bulletin Board Client. Tests do not report well. --- .../meerkat/bulletinboard/BulletinClientJob.java | 8 ++++---- .../bulletinboard/BulletinClientWorker.java | 11 ++++++++--- .../ThreadedBulletinBoardClient.java | 15 ++++++++++++++- .../java/BulletinBoardClientIntegrationTest.java | 6 +++++- .../bulletinboard/BulletinBoardClient.java | 6 ++++++ 5 files changed, 37 insertions(+), 9 deletions(-) diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientJob.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientJob.java index b63ca50..aca98d4 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientJob.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientJob.java @@ -37,6 +37,10 @@ public class BulletinClientJob { this.maxRetry = maxRetry; } + public void updateServerAddresses(List newServerAdresses) { + this.serverAddresses = newServerAdresses; + } + public List getServerAddresses() { return serverAddresses; } @@ -57,10 +61,6 @@ public class BulletinClientJob { return maxRetry; } - public Iterator getAddressIterator() { - return serverAddresses.iterator(); - } - public void shuffleAddresses() { Collections.shuffle(serverAddresses); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientWorker.java index 9ce5ef4..3b9c781 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientWorker.java @@ -17,6 +17,8 @@ import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response; import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.Callable; /** @@ -80,6 +82,8 @@ public class BulletinClientWorker implements Callable { String requestPath; Message msg; + List serverAddresses = new LinkedList(job.getServerAddresses()); + Message payload = job.getPayload(); BulletinBoardMessageList msgList; @@ -114,7 +118,7 @@ public class BulletinClientWorker implements Callable { case GET_REDUNDANCY: // Make sure the payload is a MessageId if (!(payload instanceof MessageID)) { - throw new IllegalArgumentException("Cannot search for an object that is not an instance of BulletinBoardMessage"); + throw new IllegalArgumentException("Cannot search for an object that is not an instance of MessageID"); } requestPath = Constants.READ_MESSAGES_PATH; @@ -135,7 +139,7 @@ public class BulletinClientWorker implements Callable { // Iterate through servers - Iterator addressIterator = job.getAddressIterator(); + Iterator addressIterator = serverAddresses.iterator(); while (addressIterator.hasNext()) { @@ -192,7 +196,8 @@ public class BulletinClientWorker implements Callable { case POST_MESSAGE: // The job now contains the information required to ascertain whether enough server posts have succeeded - // It also contains the list of servers in which the post was not successful + // It will also contain the list of servers in which the post was not successful + job.updateServerAddresses(serverAddresses); return new BulletinClientJobResult(job, null); case GET_REDUNDANCY: diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java index dd1ab0f..bb46c32 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java @@ -13,6 +13,7 @@ import meerkat.protobuf.Voting; import java.util.List; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * Created by Arbel Deutsch Peled on 05-Dec-15. @@ -107,7 +108,7 @@ public class ThreadedBulletinBoardClient implements BulletinBoardClient { public void readMessages(MessageFilterList filterList, ClientCallback> callback) { // Create job - BulletinClientJob job = new BulletinClientJob(meerkatDBs, minAbsoluteRedundancy, BulletinClientJob.JobType.GET_REDUNDANCY, + BulletinClientJob job = new BulletinClientJob(meerkatDBs, minAbsoluteRedundancy, BulletinClientJob.JobType.READ_MESSAGES, filterList, READ_MESSAGES_RETRY_NUM); // Submit job and create callback @@ -115,4 +116,16 @@ public class ThreadedBulletinBoardClient implements BulletinBoardClient { } + @Override + public void close() { + try { + listeningExecutor.shutdown(); + while (! listeningExecutor.isShutdown()) { + listeningExecutor.awaitTermination(10, TimeUnit.SECONDS); + } + } catch (InterruptedException e) { + System.err.println(e.getCause() + " " + e.getMessage()); + } + } + } diff --git a/bulletin-board-client/src/test/java/BulletinBoardClientIntegrationTest.java b/bulletin-board-client/src/test/java/BulletinBoardClientIntegrationTest.java index b50d1e5..3e33eea 100644 --- a/bulletin-board-client/src/test/java/BulletinBoardClientIntegrationTest.java +++ b/bulletin-board-client/src/test/java/BulletinBoardClientIntegrationTest.java @@ -30,7 +30,9 @@ public class BulletinBoardClientIntegrationTest { private class PostCallback implements ClientCallback{ @Override - public void handleCallback(Object msg) {} + public void handleCallback(Object msg) { + System.err.println("Post operation completed"); + } @Override public void handleFailure(Throwable t) { @@ -184,6 +186,8 @@ public class BulletinBoardClientIntegrationTest { bulletinBoardClient.readMessages(filterList, readCallback); + bulletinBoardClient.close(); + } } diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java index 1577527..c51e561 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java @@ -45,4 +45,10 @@ public interface BulletinBoardClient { */ void readMessages(MessageFilterList filterList, ClientCallback> callback); + /** + * Closes all connections, if any. + * This is done in a synchronous (blocking) way. + */ + void close(); + }