From 141d286af26dc0663da74376f002e8b18ba6f005 Mon Sep 17 00:00:00 2001 From: Arbel Deutsch Peled Date: Sun, 17 Jan 2016 10:59:05 +0200 Subject: [PATCH] Dual-layered threaded BB Client. Supports basic functionality. Does not support Batch Messages yet. --- .../bulletinboard/BulletinClientJob.java | 82 ------- .../BulletinClientJobResult.java | 29 --- .../bulletinboard/BulletinClientWorker.java | 224 ++--------------- .../bulletinboard/MultiServerWorker.java | 104 ++++++++ .../SingleServerBulletinBoardClient.java | 228 ++++++++++++++++++ .../bulletinboard/SingleServerWorker.java | 39 +++ .../ThreadedBulletinBoardClient.java | 95 ++++++-- .../callbacks/ClientFutureCallback.java | 19 -- .../GetRedundancyFutureCallback.java | 38 --- .../callbacks/PostMessageFutureCallback.java | 44 ---- .../callbacks/ReadMessagesFutureCallback.java | 35 --- .../MultiServerGetRedundancyWorker.java | 74 ++++++ .../workers/MultiServerPostBatchWorker.java | 7 + .../workers/MultiServerPostMessageWorker.java | 72 ++++++ .../MultiServerReadMessagesWorker.java | 65 +++++ .../SingleServerGetRedundancyWorker.java | 79 ++++++ .../SingleServerPostMessageWorker.java | 61 +++++ .../SingleServerReadMessagesWorker.java | 68 ++++++ .../BulletinBoardClientIntegrationTest.java | 4 +- meerkat-common/build.gradle | 2 +- .../AsyncBulletinBoardClient.java | 44 +++- 21 files changed, 927 insertions(+), 486 deletions(-) delete mode 100644 bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientJob.java delete mode 100644 bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientJobResult.java create mode 100644 bulletin-board-client/src/main/java/meerkat/bulletinboard/MultiServerWorker.java create mode 100644 bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java create mode 100644 bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerWorker.java delete mode 100644 bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/ClientFutureCallback.java delete mode 100644 bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/GetRedundancyFutureCallback.java delete mode 100644 bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/PostMessageFutureCallback.java delete mode 100644 bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/ReadMessagesFutureCallback.java create mode 100644 bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerGetRedundancyWorker.java create mode 100644 bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerPostBatchWorker.java create mode 100644 bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerPostMessageWorker.java create mode 100644 bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerReadMessagesWorker.java create mode 100644 bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/SingleServerGetRedundancyWorker.java create mode 100644 bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/SingleServerPostMessageWorker.java create mode 100644 bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/SingleServerReadMessagesWorker.java diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientJob.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientJob.java deleted file mode 100644 index aca98d4..0000000 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientJob.java +++ /dev/null @@ -1,82 +0,0 @@ -package meerkat.bulletinboard; - -import com.google.protobuf.Message; - -import java.util.Collections; -import java.util.Iterator; -import java.util.List; - -/** - * Created by Arbel Deutsch Peled on 09-Dec-15. - * - * This class specifies the job that is required of a Bulletin Board Client Worker - */ -public class BulletinClientJob { - - public static enum JobType{ - POST_MESSAGE, // Post a message to servers - READ_MESSAGES, // Read messages according to some given filter (any server will do) - GET_REDUNDANCY // Check the redundancy of a specific message in the databases - } - - private List serverAddresses; - - private int minServers; // The minimal number of servers the job must be successful on for the job to be completed - - private final JobType jobType; - - private final Message payload; // The information associated with the job type - - private int maxRetry; // Number of retries for this job; set to -1 for infinite retries - - public BulletinClientJob(List serverAddresses, int minServers, JobType jobType, Message payload, int maxRetry) { - this.serverAddresses = serverAddresses; - this.minServers = minServers; - this.jobType = jobType; - this.payload = payload; - this.maxRetry = maxRetry; - } - - public void updateServerAddresses(List newServerAdresses) { - this.serverAddresses = newServerAdresses; - } - - public List getServerAddresses() { - return serverAddresses; - } - - public int getMinServers() { - return minServers; - } - - public JobType getJobType() { - return jobType; - } - - public Message getPayload() { - return payload; - } - - public int getMaxRetry() { - return maxRetry; - } - - public void shuffleAddresses() { - Collections.shuffle(serverAddresses); - } - - public void decMinServers(){ - minServers--; - } - - public void decMaxRetry(){ - if (maxRetry > 0) { - maxRetry--; - } - } - - public boolean isRetry(){ - return (maxRetry != 0); - } - -} \ No newline at end of file diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientJobResult.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientJobResult.java deleted file mode 100644 index be0501b..0000000 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientJobResult.java +++ /dev/null @@ -1,29 +0,0 @@ -package meerkat.bulletinboard; - -import com.google.protobuf.Message; - -/** - * Created by Arbel Deutsch Peled on 09-Dec-15. - * - * This class contains the end status and result of a Bulletin Board Client Job. - */ -public final class BulletinClientJobResult { - - private final BulletinClientJob job; // Stores the job the result refers to - - private final Message result; // The result of the job; valid only if success==true - - public BulletinClientJobResult(BulletinClientJob job, Message result) { - this.job = job; - this.result = result; - } - - public BulletinClientJob getJob() { - return job; - } - - public Message getResult() { - return result; - } - -} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientWorker.java index f03ab31..dba596b 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientWorker.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientWorker.java @@ -1,218 +1,38 @@ package meerkat.bulletinboard; -import com.google.protobuf.Message; -import meerkat.comm.CommunicationException; -import meerkat.crypto.Digest; -import meerkat.crypto.concrete.SHA256Digest; -import meerkat.protobuf.BulletinBoardAPI.*; -import meerkat.rest.Constants; -import meerkat.rest.ProtobufMessageBodyReader; -import meerkat.rest.ProtobufMessageBodyWriter; -import static meerkat.bulletinboard.BulletinBoardConstants.*; - -import javax.ws.rs.ProcessingException; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.Entity; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.Response; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.Callable; - /** * Created by Arbel Deutsch Peled on 09-Dec-15. * - * This class implements the actual communication with the Bulletin Board Servers. + * This class handles bulletin client work. * It is meant to be used in a multi-threaded environment. */ -//TODO: Maybe make this abstract and inherit from it. -public class BulletinClientWorker implements Callable { +public abstract class BulletinClientWorker { - private final BulletinClientJob job; // The requested job to be handled + protected IN payload; // Payload of the job - public BulletinClientWorker(BulletinClientJob job){ - this.job = job; + private int maxRetry; // Number of retries for this job; set to -1 for infinite retries + + public BulletinClientWorker(IN payload, int maxRetry) { + this.payload = payload; + this.maxRetry = maxRetry; } - // This resource enabled creation of a single Client per thread. - private static final ThreadLocal clientLocal = - new ThreadLocal () { - @Override protected Client initialValue() { - Client client; - client = ClientBuilder.newClient(); - client.register(ProtobufMessageBodyReader.class); - client.register(ProtobufMessageBodyWriter.class); + public IN getPayload() { + return payload; + } - return client; - } - }; - - // This resource enables creation of a single Digest per thread. - private static final ThreadLocal digestLocal = - new ThreadLocal () { - @Override protected Digest initialValue() { - Digest digest; - digest = new SHA256Digest(); //TODO: Make this generic. - - return digest; - } - }; - - /** - * This method carries out the actual communication with the servers via HTTP Post - * It accesses the servers according to the job it received and updates said job as it goes - * The method will only iterate once through the server list, removing servers from the list when they are no longer required - * In a POST_MESSAGE job: successful post to a server results in removing the server from the list - * In a GET_REDUNDANCY job: no server is removed from the list and the (absolute) number of servers in which the message was found is returned - * In a READ_MESSAGES job: successful retrieval from any server terminates the method and returns the received values; The list is not changed - * @return The original job, modified to fit the current state and the required output (if any) of the operation - * @throws IllegalArgumentException - * @throws CommunicationException - */ - public BulletinClientJobResult call() throws IllegalArgumentException, CommunicationException{ - - Client client = clientLocal.get(); - Digest digest = digestLocal.get(); - - WebTarget webTarget; - Response response; - - String requestPath; - Message msg; - - List serverAddresses = new LinkedList(job.getServerAddresses()); - - Message payload = job.getPayload(); - - BulletinBoardMessageList msgList; - - int count = 0; // Used to count number of servers which contain the required message in a GET_REDUNDANCY request. - - job.shuffleAddresses(); // This is done to randomize the order of access to servers primarily for READ operations - - // Prepare the request. - switch(job.getJobType()) { - - case POST_MESSAGE: - // Make sure the payload is a BulletinBoardMessage - if (!(payload instanceof BulletinBoardMessage)) { - throw new IllegalArgumentException("Cannot post an object that is not an instance of BulletinBoardMessage"); - } - - msg = payload; - requestPath = POST_MESSAGE_PATH; - break; - - case READ_MESSAGES: - // Make sure the payload is a MessageFilterList - if (!(payload instanceof MessageFilterList)) { - throw new IllegalArgumentException("Read failed: an instance of MessageFilterList is required as payload for a READ_MESSAGES operation"); - } - - msg = payload; - requestPath = READ_MESSAGES_PATH; - break; - - case GET_REDUNDANCY: - // Make sure the payload is a MessageId - if (!(payload instanceof MessageID)) { - throw new IllegalArgumentException("Cannot search for an object that is not an instance of MessageID"); - } - - requestPath = READ_MESSAGES_PATH; - - msg = MessageFilterList.newBuilder() - .addFilter(MessageFilter.newBuilder() - .setType(FilterType.MSG_ID) - .setId(((MessageID) payload).getID()) - .build() - ).build(); - - break; - - default: - throw new IllegalArgumentException("Unsupported job type"); - - } - - // Iterate through servers - - Iterator addressIterator = serverAddresses.iterator(); - - while (addressIterator.hasNext()) { - - // Send request to Server - String address = addressIterator.next(); - webTarget = client.target(address).path(BULLETIN_BOARD_SERVER_PATH).path(requestPath); - response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(msg, Constants.MEDIATYPE_PROTOBUF)); - - // Retrieve answer - switch(job.getJobType()) { - - case POST_MESSAGE: - try { - - response.readEntity(BoolMsg.class); // If a BoolMsg entity is returned: the post was successful - addressIterator.remove(); // Post to this server succeeded: remove server from list - job.decMinServers(); - - } catch (ProcessingException | IllegalStateException e) {} // Post to this server failed: retry next time - finally { - response.close(); - } - break; - - case GET_REDUNDANCY: - try { - msgList = response.readEntity(BulletinBoardMessageList.class); // If a BulletinBoardMessageList is returned: the read was successful - - if (msgList.getMessageList().size() > 0){ // Message was found in the server. - count++; - } - } catch (ProcessingException | IllegalStateException e) {} // Read failed: try with next server - finally { - response.close(); - } - break; - - case READ_MESSAGES: - try { - msgList = response.readEntity(BulletinBoardMessageList.class); // If a BulletinBoardMessageList is returned: the read was successful - return new BulletinClientJobResult(job, msgList); // Return the result - } catch (ProcessingException | IllegalStateException e) {} // Read failed: try with next server - finally { - response.close(); - } - break; - - } - - } - - // Return result (if haven't done so yet) - switch(job.getJobType()) { - - case POST_MESSAGE: - // The job now contains the information required to ascertain whether enough server posts have succeeded - // It will also contain the list of servers in which the post was not successful - job.updateServerAddresses(serverAddresses); - return new BulletinClientJobResult(job, null); - - case GET_REDUNDANCY: - // Return the number of servers in which the message was found - // The job now contains the list of these servers - return new BulletinClientJobResult(job, IntMsg.newBuilder().setValue(count).build()); - - case READ_MESSAGES: - // A successful operation would have already returned an output - // Therefore: no server access was successful - throw new CommunicationException("Could not access any server"); - - default: // This is required for successful compilation - throw new IllegalArgumentException("Unsupported job type"); + public int getMaxRetry() { + return maxRetry; + } + public void decMaxRetry(){ + if (maxRetry > 0) { + maxRetry--; } } + + public boolean isRetry(){ + return (maxRetry != 0); + } + } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/MultiServerWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/MultiServerWorker.java new file mode 100644 index 0000000..8db836f --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/MultiServerWorker.java @@ -0,0 +1,104 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.FutureCallback; + +import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Created by Arbel Deutsch Peled on 09-Dec-15. + * + * This is a general class for handling multi-server work + * It utilizes Single Server Clients to perform the actual per-server work + */ +public abstract class MultiServerWorker extends BulletinClientWorker implements Runnable, ClientCallback{ + + private List clients; + + protected AtomicInteger minServers; // The minimal number of servers the job must be successful on for the job to be completed + + protected AtomicInteger maxFailedServers; // The maximal number of allowed server failures + + private AtomicBoolean returnedResult; + + private ClientCallback clientCallback; + + /** + * Constructor + * @param clients contains a list of Single Server clients to handle requests + * @param shuffleClients is a boolean stating whether or not it is needed to shuffle the clients + * @param minServers is the minimal amount of servers needed in order to successfully complete the job + * @param payload is the payload for the job + * @param maxRetry is the maximal per-server retry count + * @param clientCallback contains the callback methods used to report the result back to the client + */ + public MultiServerWorker(List clients, boolean shuffleClients, + int minServers, IN payload, int maxRetry, + ClientCallback clientCallback) { + + super(payload,maxRetry); + + this.clients = clients; + if (shuffleClients){ + Collections.shuffle(clients); + } + + this.minServers = new AtomicInteger(minServers); + maxFailedServers = new AtomicInteger(clients.size() - minServers); + this.clientCallback = clientCallback; + + returnedResult = new AtomicBoolean(false); + + } + + /** + * Constructor overload without client shuffling + */ + public MultiServerWorker(List clients, + int minServers, IN payload, int maxRetry, + ClientCallback clientCallback) { + + this(clients, false, minServers, payload, maxRetry, clientCallback); + + } + + /** + * Used to report a successful operation to the client + * Only reports once to the client + * @param result is the result + */ + protected void succeed(OUT result){ + if (returnedResult.compareAndSet(false, true)) { + clientCallback.handleCallback(result); + } + } + + /** + * Used to report a failed operation to the client + * Only reports once to the client + * @param t contains the error/exception that occurred + */ + protected void fail(Throwable t){ + if (returnedResult.compareAndSet(false, true)) { + clientCallback.handleFailure(t); + } + } + + /** + * Used by implementations to get a Single Server Client iterator + * @return the requested iterator + */ + protected Iterator getClientIterator() { + return clients.iterator(); + } + + protected int getClientNumber() { + return clients.size(); + } + +} \ No newline at end of file diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java new file mode 100644 index 0000000..e92f35b --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerBulletinBoardClient.java @@ -0,0 +1,228 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; +import meerkat.bulletinboard.workers.SingleServerGetRedundancyWorker; +import meerkat.bulletinboard.workers.SingleServerPostMessageWorker; +import meerkat.bulletinboard.workers.SingleServerReadMessagesWorker; +import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.protobuf.Voting.BulletinBoardClientParams; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Created by Arbel Deutsch Peled on 28-Dec-15. + * + * This class implements the asynchronous Bulletin Board Client interface + * It only handles a single Bulletin Board Server + * If the list of servers contains more than one server: the server actually used is the first one + * The class further implements a delayed access to the server after a communication error occurs + */ +public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient implements AsyncBulletinBoardClient { + + private final int MAX_RETRIES = 10; + + protected ListeningScheduledExecutorService executorService; + + private long lastServerErrorTime; + + protected final long failDelayInMilliseconds; + + /** + * Notify the client that a job has failed + * This makes new scheduled jobs be scheduled for a later time (after the given delay) + */ + protected void fail() { + + // Update last fail time + lastServerErrorTime = System.currentTimeMillis(); + + } + + /** + * This method adds a worker to the scheduled queue of the threadpool + * If the server is in an accessible state: the job is submitted for immediate handling + * If the server is not accessible: the job is scheduled for a later time + * @param worker is the worker that should be scheduled for work + * @param callback is the class containing callbacks for handling job completion/failure + */ + protected void scheduleWorker(SingleServerWorker worker, FutureCallback callback){ + + long timeSinceLastServerError = System.currentTimeMillis() - lastServerErrorTime; + + if (timeSinceLastServerError >= failDelayInMilliseconds) { + + // Schedule for immediate processing + Futures.addCallback(executorService.submit(worker), callback); + + } else { + + // Schedule for processing immediately following delay expiry + Futures.addCallback(executorService.schedule( + worker, + failDelayInMilliseconds - timeSinceLastServerError, + TimeUnit.MILLISECONDS), + callback); + + } + + } + + /** + * Inner class for handling simple operation results and retrying if needed + */ + class RetryCallback implements FutureCallback { + + private SingleServerWorker worker; + private ClientCallback clientCallback; + + public RetryCallback(SingleServerWorker worker, ClientCallback clientCallback) { + this.worker = worker; + this.clientCallback = clientCallback; + } + + @Override + public void onSuccess(T result) { + clientCallback.handleCallback(result); + } + + @Override + public void onFailure(Throwable t) { + + // Notify client about failure + fail(); + + // Check if another attempt should be made + + worker.decMaxRetry(); + + if (worker.isRetry()) { + // Perform another attempt + scheduleWorker(worker, this); + } else { + // No more retries: notify caller about failure + clientCallback.handleFailure(t); + } + + } + + } + + + public SingleServerBulletinBoardClient(int threadPoolSize, long failDelayInMilliseconds) { + + executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadPoolSize)); + + this.failDelayInMilliseconds = failDelayInMilliseconds; + + // Set server error time to a time sufficiently in the past to make new jobs go through + lastServerErrorTime = System.currentTimeMillis() - failDelayInMilliseconds; + + } + + /** + * Stores database location, initializes the web Client and + * @param clientParams contains the data needed to access the DBs + */ + @Override + public void init(BulletinBoardClientParams clientParams) { + + // Perform usual setup + super.init(clientParams); + + // Remove all but first DB address + String dbAddress = meerkatDBs.get(0); + meerkatDBs = new LinkedList(); + meerkatDBs.add(dbAddress); + + } + + @Override + public MessageID postMessage(BulletinBoardMessage msg, ClientCallback callback) { + + // Create worker with redundancy 1 and MAX_RETRIES retries + SingleServerPostMessageWorker worker = new SingleServerPostMessageWorker(meerkatDBs.get(0), msg, MAX_RETRIES); + + // Submit worker and create callback + scheduleWorker(worker, new RetryCallback(worker, callback)); + + // Calculate the correct message ID and return it + digest.reset(); + digest.update(msg.getMsg()); + return MessageID.newBuilder().setID(ByteString.copyFrom(digest.digest())).build(); + + } + + @Override + public MessageID postBatch(CompleteBatch completeBatch, ClientCallback callback) { + return null; + } + + @Override + public void beginBatch(byte[] signerId, int batchId, List tagList, ClientCallback callback) { + + } + + @Override + public void postBatchData(byte[] signerId, int batchId, List batchDataList, + int startPosition, ClientCallback callback) { + + } + + @Override + public void postBatchData(byte[] signerId, int batchId, List batchDataList, ClientCallback callback) { + + } + + @Override + public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback callback) { + + } + + @Override + public void getRedundancy(MessageID id, ClientCallback callback) { + + // Create worker with no retries + SingleServerGetRedundancyWorker worker = new SingleServerGetRedundancyWorker(meerkatDBs.get(0), id, 1); + + // Submit job and create callback + scheduleWorker(worker, new RetryCallback(worker, callback)); + + } + + @Override + public void readMessages(MessageFilterList filterList, ClientCallback> callback) { + + // Create job with no retries + SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterList, 1); + + // Submit job and create callback + scheduleWorker(worker, new RetryCallback(worker, callback)); + + } + + @Override + public void readBatch(byte[] signerId, int batchId, ClientCallback callback) { + + } + + @Override + public void subscribe(MessageFilterList filterList, MessageHandler messageHandler) { + + } + + @Override + public void close() { + + super.close(); + + executorService.shutdown(); + + } +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerWorker.java new file mode 100644 index 0000000..82ca886 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SingleServerWorker.java @@ -0,0 +1,39 @@ +package meerkat.bulletinboard; + +import meerkat.rest.ProtobufMessageBodyReader; +import meerkat.rest.ProtobufMessageBodyWriter; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import java.util.concurrent.Callable; + +/** + * Created by Arbel Deutsch Peled on 02-Jan-16. + */ +public abstract class SingleServerWorker extends BulletinClientWorker implements Callable{ + + // This resource enabled creation of a single Client per thread. + protected static final ThreadLocal clientLocal = + new ThreadLocal () { + @Override protected Client initialValue() { + Client client; + client = ClientBuilder.newClient(); + client.register(ProtobufMessageBodyReader.class); + client.register(ProtobufMessageBodyWriter.class); + + return client; + } + }; + + protected String serverAddress; + + public SingleServerWorker(String serverAddress, IN payload, int maxRetry) { + super(payload, maxRetry); + this.serverAddress = serverAddress; + } + + public String getServerAddress() { + return serverAddress; + } + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java index 71d852e..ce7a01e 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java @@ -1,15 +1,17 @@ package meerkat.bulletinboard; -import com.google.common.util.concurrent.*; import com.google.protobuf.ByteString; -import meerkat.bulletinboard.callbacks.GetRedundancyFutureCallback; -import meerkat.bulletinboard.callbacks.PostMessageFutureCallback; -import meerkat.bulletinboard.callbacks.ReadMessagesFutureCallback; + +import meerkat.bulletinboard.workers.MultiServerGetRedundancyWorker; +import meerkat.bulletinboard.workers.MultiServerPostMessageWorker; +import meerkat.bulletinboard.workers.MultiServerReadMessagesWorker; import meerkat.comm.CommunicationException; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Voting.*; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -22,10 +24,19 @@ import java.util.concurrent.TimeUnit; */ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient implements AsyncBulletinBoardClient { - private final static int THREAD_NUM = 10; - ListeningExecutorService listeningExecutor; + // Executor service for handling jobs + private final static int JOBS_THREAD_NUM = 5; + ExecutorService executorService; + // Per-server clients + List clients; + + private final static int POST_MESSAGE_RETRY_NUM = 3; private final static int READ_MESSAGES_RETRY_NUM = 1; + private final static int GET_REDUNDANCY_RETRY_NUM = 1; + + private static final int SERVER_THREADPOOL_SIZE = 5; + private static final long FAIL_DELAY = 5000; private int minAbsoluteRedundancy; @@ -42,7 +53,16 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple minAbsoluteRedundancy = (int) (clientParams.getMinRedundancy() * clientParams.getBulletinBoardAddressCount()); - listeningExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_NUM)); + executorService = Executors.newFixedThreadPool(JOBS_THREAD_NUM); + + clients = new ArrayList(clientParams.getBulletinBoardAddressCount()); + for (String address : clientParams.getBulletinBoardAddressList()){ + SingleServerBulletinBoardClient client = new SingleServerBulletinBoardClient(SERVER_THREADPOOL_SIZE, FAIL_DELAY); + client.init(BulletinBoardClientParams.newBuilder() + .addBulletinBoardAddress(address) + .build()); + clients.add(client); + } } @@ -54,28 +74,52 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple * @throws CommunicationException */ @Override - public MessageID postMessage(BulletinBoardMessage msg, ClientCallback callback){ + public MessageID postMessage(BulletinBoardMessage msg, ClientCallback callback){ // Create job - BulletinClientJob job = new BulletinClientJob(meerkatDBs, minAbsoluteRedundancy, BulletinClientJob.JobType.POST_MESSAGE, msg, -1); + MultiServerPostMessageWorker worker = + new MultiServerPostMessageWorker(clients, minAbsoluteRedundancy, msg, POST_MESSAGE_RETRY_NUM, callback); - // Submit job and create callback - Futures.addCallback(listeningExecutor.submit(new BulletinClientWorker(job)), new PostMessageFutureCallback(listeningExecutor, callback)); + // Submit job + executorService.submit(worker); // Calculate the correct message ID and return it digest.reset(); digest.update(msg.getMsg()); return MessageID.newBuilder().setID(ByteString.copyFrom(digest.digest())).build(); + } @Override - public MessageID postBatch(byte[] signerId, int batchId, List batchDataList, int startPosition, ClientCallback callback) { - return null; //TODO: Implement + public MessageID postBatch(CompleteBatch completeBatch, ClientCallback callback) { + + return null; // TODO: write this + } @Override - public MessageID postBatch(byte[] signerId, int batchId, List batchDataList, ClientCallback callback) { - return null; //TODO: Implement + public void beginBatch(byte[] signerId, int batchId, List tagList, ClientCallback callback) { + // TODO: write this + } + + @Override + public void postBatchData(byte[] signerId, int batchId, List batchDataList, + int startPosition, ClientCallback callback) { + + // TODO: write this + + } + + @Override + public void postBatchData(byte[] signerId, int batchId, List batchDataList, ClientCallback callback) { + + postBatchData(signerId, batchId, batchDataList, 0, callback); // Write batch from beginning + + } + + @Override + public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback callback) { + // TODO: write this } /** @@ -88,10 +132,11 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple public void getRedundancy(MessageID id, ClientCallback callback) { // Create job - BulletinClientJob job = new BulletinClientJob(meerkatDBs, minAbsoluteRedundancy, BulletinClientJob.JobType.GET_REDUNDANCY, id, 1); + MultiServerGetRedundancyWorker worker = + new MultiServerGetRedundancyWorker(clients, minAbsoluteRedundancy, id, GET_REDUNDANCY_RETRY_NUM, callback); - // Submit job and create callback - Futures.addCallback(listeningExecutor.submit(new BulletinClientWorker(job)), new GetRedundancyFutureCallback(listeningExecutor, callback)); + // Submit job + executorService.submit(worker); } @@ -104,11 +149,11 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple public void readMessages(MessageFilterList filterList, ClientCallback> callback) { // Create job - BulletinClientJob job = new BulletinClientJob(meerkatDBs, minAbsoluteRedundancy, BulletinClientJob.JobType.READ_MESSAGES, - filterList, READ_MESSAGES_RETRY_NUM); + MultiServerReadMessagesWorker worker = + new MultiServerReadMessagesWorker(clients, minAbsoluteRedundancy, filterList, READ_MESSAGES_RETRY_NUM, callback); - // Submit job and create callback - Futures.addCallback(listeningExecutor.submit(new BulletinClientWorker(job)), new ReadMessagesFutureCallback(listeningExecutor, callback)); + // Submit job + executorService.submit(worker); } @@ -127,9 +172,9 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple super.close(); try { - listeningExecutor.shutdown(); - while (! listeningExecutor.isShutdown()) { - listeningExecutor.awaitTermination(10, TimeUnit.SECONDS); + executorService.shutdown(); + while (! executorService.isShutdown()) { + executorService.awaitTermination(10, TimeUnit.SECONDS); } } catch (InterruptedException e) { System.err.println(e.getCause() + " " + e.getMessage()); diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/ClientFutureCallback.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/ClientFutureCallback.java deleted file mode 100644 index 54cc63a..0000000 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/ClientFutureCallback.java +++ /dev/null @@ -1,19 +0,0 @@ -package meerkat.bulletinboard.callbacks; - -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.ListeningExecutorService; -import meerkat.bulletinboard.BulletinClientJobResult; - -/** - * This is a future callback used to listen to workers and run on job finish - * Depending on the type of job and the finishing status of the worker: a decision is made whether to retry or return an error - */ -public abstract class ClientFutureCallback implements FutureCallback { - - protected ListeningExecutorService listeningExecutor; - - ClientFutureCallback(ListeningExecutorService listeningExecutor) { - this.listeningExecutor = listeningExecutor; - } - -} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/GetRedundancyFutureCallback.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/GetRedundancyFutureCallback.java deleted file mode 100644 index 719428f..0000000 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/GetRedundancyFutureCallback.java +++ /dev/null @@ -1,38 +0,0 @@ -package meerkat.bulletinboard.callbacks; - -import com.google.common.util.concurrent.ListeningExecutorService; -import meerkat.bulletinboard.AsyncBulletinBoardClient.*; -import meerkat.bulletinboard.BulletinClientJobResult; -import meerkat.protobuf.BulletinBoardAPI.*; - -import java.util.List; - -/** - * This is a future callback used to listen to workers and run on job finish - * Depending on the type of job and the finishing status of the worker: a decision is made whether to retry or return an error - */ -public class GetRedundancyFutureCallback extends ClientFutureCallback { - - private ClientCallback callback; - - public GetRedundancyFutureCallback(ListeningExecutorService listeningExecutor, - ClientCallback callback) { - super(listeningExecutor); - this.callback = callback; - } - - @Override - public void onSuccess(BulletinClientJobResult result) { - - int absoluteRedundancy = ((IntMsg) result.getResult()).getValue(); - int totalServers = result.getJob().getServerAddresses().size(); - - callback.handleCallback( ((float) absoluteRedundancy) / ((float) totalServers) ); - - } - - @Override - public void onFailure(Throwable t) { - callback.handleFailure(t); - } -} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/PostMessageFutureCallback.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/PostMessageFutureCallback.java deleted file mode 100644 index abd4247..0000000 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/PostMessageFutureCallback.java +++ /dev/null @@ -1,44 +0,0 @@ -package meerkat.bulletinboard.callbacks; - -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListeningExecutorService; -import meerkat.bulletinboard.AsyncBulletinBoardClient.*; -import meerkat.bulletinboard.BulletinClientJob; -import meerkat.bulletinboard.BulletinClientJobResult; -import meerkat.bulletinboard.BulletinClientWorker; - - -/** - * This is a future callback used to listen to workers and run on job finish - * Depending on the type of job and the finishing status of the worker: a decision is made whether to retry or return an error - */ -public class PostMessageFutureCallback extends ClientFutureCallback { - - private ClientCallback callback; - - public PostMessageFutureCallback(ListeningExecutorService listeningExecutor, - ClientCallback callback) { - super(listeningExecutor); - this.callback = callback; - } - - @Override - public void onSuccess(BulletinClientJobResult result) { - - BulletinClientJob job = result.getJob(); - - job.decMaxRetry(); - - // If redundancy is below threshold: retry - if (job.getMinServers() > 0 && job.isRetry()) { - Futures.addCallback(listeningExecutor.submit(new BulletinClientWorker(job)), this); - } - - callback.handleCallback(null); - } - - @Override - public void onFailure(Throwable t) { - callback.handleFailure(t); - } -} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/ReadMessagesFutureCallback.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/ReadMessagesFutureCallback.java deleted file mode 100644 index 808b7a6..0000000 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/ReadMessagesFutureCallback.java +++ /dev/null @@ -1,35 +0,0 @@ -package meerkat.bulletinboard.callbacks; - -import com.google.common.util.concurrent.ListeningExecutorService; -import meerkat.bulletinboard.AsyncBulletinBoardClient.*; -import meerkat.bulletinboard.BulletinClientJobResult; -import meerkat.protobuf.BulletinBoardAPI; - -import java.util.List; - -/** - * This is a future callback used to listen to workers and run on job finish - * Depending on the type of job and the finishing status of the worker: a decision is made whether to retry or return an error - */ -public class ReadMessagesFutureCallback extends ClientFutureCallback { - - private ClientCallback> callback; - - public ReadMessagesFutureCallback(ListeningExecutorService listeningExecutor, - ClientCallback> callback) { - super(listeningExecutor); - this.callback = callback; - } - - @Override - public void onSuccess(BulletinClientJobResult result) { - - callback.handleCallback(((BulletinBoardAPI.BulletinBoardMessageList) result.getResult()).getMessageList()); - - } - - @Override - public void onFailure(Throwable t) { - callback.handleFailure(t); - } -} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerGetRedundancyWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerGetRedundancyWorker.java new file mode 100644 index 0000000..4d5e7f2 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerGetRedundancyWorker.java @@ -0,0 +1,74 @@ +package meerkat.bulletinboard.workers; + +import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import meerkat.bulletinboard.MultiServerWorker; +import meerkat.bulletinboard.SingleServerBulletinBoardClient; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI.*; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + */ +public class MultiServerGetRedundancyWorker extends MultiServerWorker { + + private AtomicInteger serversContainingMessage; + private AtomicInteger totalContactedServers; + + public MultiServerGetRedundancyWorker(List clients, + int minServers, MessageID payload, int maxRetry, + ClientCallback clientCallback) { + + super(clients, minServers, payload, maxRetry, clientCallback); // Shuffle clients on creation to balance load + + serversContainingMessage = new AtomicInteger(0); + totalContactedServers = new AtomicInteger(0); + + } + + /** + * This method carries out the actual communication with the servers via HTTP Post + * It accesses the servers in a random order until one answers it + * Successful retrieval from any server terminates the method and returns the received values; The list is not changed + * @return The original job and the list of messages found in the first server that answered the query + * @throws CommunicationException + */ + public void run(){ + + Iterator clientIterator = getClientIterator(); + + // Iterate through clients + + while (clientIterator.hasNext()) { + + SingleServerBulletinBoardClient client = clientIterator.next(); + + // Send request to client + client.getRedundancy(payload,this); + + } + + } + + @Override + public void handleCallback(Float result) { + + if (result > 0.5) { + serversContainingMessage.incrementAndGet(); + } + + if (totalContactedServers.incrementAndGet() >= getClientNumber()){ + succeed(new Float(((float) serversContainingMessage.get()) / ((float) getClientNumber()) )); + } + + } + + @Override + public void handleFailure(Throwable t) { + handleCallback(new Float(0.0)); + } + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerPostBatchWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerPostBatchWorker.java new file mode 100644 index 0000000..cfc34e7 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerPostBatchWorker.java @@ -0,0 +1,7 @@ +package meerkat.bulletinboard.workers; + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + */ +public class MultiServerPostBatchWorker { +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerPostMessageWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerPostMessageWorker.java new file mode 100644 index 0000000..dcdc82a --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerPostMessageWorker.java @@ -0,0 +1,72 @@ +package meerkat.bulletinboard.workers; + +import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import meerkat.bulletinboard.MultiServerWorker; +import meerkat.bulletinboard.SingleServerBulletinBoardClient; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI.*; + +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; +import java.util.Iterator; +import java.util.List; + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + */ +public class MultiServerPostMessageWorker extends MultiServerWorker { + + public MultiServerPostMessageWorker(List clients, + int minServers, BulletinBoardMessage payload, int maxRetry, + ClientCallback clientCallback) { + + super(clients, minServers, payload, maxRetry, clientCallback); + + } + + /** + * This method carries out the actual communication with the servers via HTTP Post + * It accesses the servers one by one and tries to post the payload to each in turn + * The method will only iterate once through the server list + * Successful post to a server results in removing the server from the list + * @return The original job, but with a modified server list + * @throws CommunicationException + */ + public void run() { + + WebTarget webTarget; + Response response; + + int count = 0; // Used to count number of servers which contain the required message in a GET_REDUNDANCY request. + + // Iterate through servers + + Iterator clientIterator = getClientIterator(); + + while (clientIterator.hasNext()) { + + // Send request to Server + SingleServerBulletinBoardClient client = clientIterator.next(); + + client.postMessage(payload, this); + + } + + } + + @Override + public void handleCallback(Boolean result) { + if (result){ + if (minServers.decrementAndGet() <= 0){ + succeed(Boolean.TRUE); + } + } + } + + @Override + public void handleFailure(Throwable t) { + if (maxFailedServers.decrementAndGet() < 0){ + fail(t); + } + } +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerReadMessagesWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerReadMessagesWorker.java new file mode 100644 index 0000000..bf19526 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/MultiServerReadMessagesWorker.java @@ -0,0 +1,65 @@ +package meerkat.bulletinboard.workers; + +import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback; +import meerkat.bulletinboard.MultiServerWorker; +import meerkat.bulletinboard.SingleServerBulletinBoardClient; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI.*; + +import java.util.Iterator; +import java.util.List; + + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + */ +public class MultiServerReadMessagesWorker extends MultiServerWorker>{ + + private Iterator clientIterator; + + public MultiServerReadMessagesWorker(List clients, + int minServers, MessageFilterList payload, int maxRetry, + ClientCallback> clientCallback) { + + super(clients, true, minServers, payload, maxRetry, clientCallback); // Shuffle clients on creation to balance load + + clientIterator = getClientIterator(); + + } + + /** + * This method carries out the actual communication with the servers via HTTP Post + * It accesses the servers in a random order until one answers it + * Successful retrieval from any server terminates the method and returns the received values; The list is not changed + * @return The original job and the list of messages found in the first server that answered the query + * @throws CommunicationException + */ + public void run(){ + + // Iterate through servers + + if (clientIterator.hasNext()) { + + // Send request to Server + SingleServerBulletinBoardClient client = clientIterator.next(); + + // Retrieve answer + client.readMessages(payload,this); + + } else { + fail(new CommunicationException("Could not contact any server")); + } + + } + + @Override + public void handleCallback(List msg) { + succeed(msg); + } + + @Override + public void handleFailure(Throwable t) { + run(); // Retry with next server + } + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/SingleServerGetRedundancyWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/SingleServerGetRedundancyWorker.java new file mode 100644 index 0000000..a02da03 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/SingleServerGetRedundancyWorker.java @@ -0,0 +1,79 @@ +package meerkat.bulletinboard.workers; + +import meerkat.bulletinboard.SingleServerWorker; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.rest.Constants; + +import javax.ws.rs.ProcessingException; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; + +import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH; +import static meerkat.bulletinboard.BulletinBoardConstants.READ_MESSAGES_PATH; + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + */ +public class SingleServerGetRedundancyWorker extends SingleServerWorker { + + public SingleServerGetRedundancyWorker(String serverAddress, MessageID payload, int maxRetry) { + super(serverAddress, payload, maxRetry); + } + + /** + * This method carries out the actual communication with the server via HTTP Post + * It queries the server for a message with the given ID + * @return TRUE if the message exists in the server and FALSE otherwise + * @throws CommunicationException if the server does not return a valid answer + */ + public Float call() throws CommunicationException{ + + Client client = clientLocal.get(); + + WebTarget webTarget; + Response response; + + MessageFilterList msgFilterList = MessageFilterList.newBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.MSG_ID) + .setId(payload.getID()) + .build() + ).build(); + + // Send request to Server + + webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(READ_MESSAGES_PATH); + response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(msgFilterList, Constants.MEDIATYPE_PROTOBUF)); + + // Retrieve answer + + try { + + // If a BulletinBoardMessageList is returned: the read was successful + BulletinBoardMessageList msgList = response.readEntity(BulletinBoardMessageList.class); + + if (msgList.getMessageList().size() > 0){ + // Message exists in the server + return new Float(1.0); + } + else { + // Message does not exist in the server + return new Float(0.0); + } + + } catch (ProcessingException | IllegalStateException e) { + + // Read failed + throw new CommunicationException("Server access failed"); + + } + finally { + response.close(); + } + + } + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/SingleServerPostMessageWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/SingleServerPostMessageWorker.java new file mode 100644 index 0000000..ee9fd3f --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/SingleServerPostMessageWorker.java @@ -0,0 +1,61 @@ +package meerkat.bulletinboard.workers; + +import meerkat.bulletinboard.SingleServerWorker; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI.BoolMsg; +import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage; +import meerkat.rest.Constants; + +import javax.ws.rs.ProcessingException; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; + +import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH; +import static meerkat.bulletinboard.BulletinBoardConstants.POST_MESSAGE_PATH; + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + * Tries to contact server once and perform a post operation + */ +public class SingleServerPostMessageWorker extends SingleServerWorker { + + public SingleServerPostMessageWorker(String serverAddress, BulletinBoardMessage payload, int maxRetry) { + super(serverAddress, payload, maxRetry); + } + + /** + * This method carries out the actual communication with the server via HTTP Post + * It accesses the server and tries to post the payload to it + * Successful post to a server results + * @return TRUE if the operation is successful + * @throws CommunicationException if the operation is unseccessful + */ + public Boolean call() throws CommunicationException{ + + Client client = clientLocal.get(); + + WebTarget webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(POST_MESSAGE_PATH);; + Response response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post( + Entity.entity(payload, Constants.MEDIATYPE_PROTOBUF)); + + try { + + // If a BoolMsg entity is returned: the post was successful + response.readEntity(BoolMsg.class); + return Boolean.TRUE; + + } catch (ProcessingException | IllegalStateException e) { + + // Post to this server failed + throw new CommunicationException("Could not contact the server"); + + } + finally { + response.close(); + } + + } + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/SingleServerReadMessagesWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/SingleServerReadMessagesWorker.java new file mode 100644 index 0000000..f8975cb --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/workers/SingleServerReadMessagesWorker.java @@ -0,0 +1,68 @@ +package meerkat.bulletinboard.workers; + +import meerkat.bulletinboard.SingleServerWorker; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessageList; +import meerkat.protobuf.BulletinBoardAPI.MessageFilterList; +import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage; +import meerkat.rest.Constants; + +import javax.ws.rs.ProcessingException; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; + +import java.util.List; + +import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH; +import static meerkat.bulletinboard.BulletinBoardConstants.READ_MESSAGES_PATH; + +/** + * Created by Arbel Deutsch Peled on 27-Dec-15. + */ +public class SingleServerReadMessagesWorker extends SingleServerWorker> { + + public SingleServerReadMessagesWorker(String serverAddress, MessageFilterList payload, int maxRetry) { + super(serverAddress, payload, maxRetry); + } + + /** + * This method carries out the actual communication with the server via HTTP Post + * Upon successful retrieval from the server the method returns the received values + * @return The list of messages returned by the server + * @throws CommunicationException if the server's response is invalid + */ + public List call() throws CommunicationException{ + + Client client = clientLocal.get(); + + WebTarget webTarget; + Response response; + + // Send request to Server + webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(READ_MESSAGES_PATH); + response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post( + Entity.entity(payload, Constants.MEDIATYPE_PROTOBUF)); + + // Retrieve answer + + try { + + // If a BulletinBoardMessageList is returned: the read was successful + return response.readEntity(BulletinBoardMessageList.class).getMessageList(); + + } catch (ProcessingException | IllegalStateException e) { + + // Read failed + throw new CommunicationException("Could not contact the server"); + + } + finally { + response.close(); + } + + + } + +} diff --git a/bulletin-board-client/src/test/java/BulletinBoardClientIntegrationTest.java b/bulletin-board-client/src/test/java/BulletinBoardClientIntegrationTest.java index d7ae69c..7bdbe08 100644 --- a/bulletin-board-client/src/test/java/BulletinBoardClientIntegrationTest.java +++ b/bulletin-board-client/src/test/java/BulletinBoardClientIntegrationTest.java @@ -32,10 +32,10 @@ public class BulletinBoardClientIntegrationTest { jobSemaphore.release(); } - private class PostCallback implements ClientCallback{ + private class PostCallback implements ClientCallback{ @Override - public void handleCallback(Object msg) { + public void handleCallback(Boolean msg) { System.err.println("Post operation completed"); jobSemaphore.release(); } diff --git a/meerkat-common/build.gradle b/meerkat-common/build.gradle index c510e0a..3783531 100644 --- a/meerkat-common/build.gradle +++ b/meerkat-common/build.gradle @@ -46,7 +46,7 @@ dependencies { compile 'com.google.protobuf:protobuf-java:3.+' // ListeningExecutor - compile 'com.google.guava:guava:11.0.+' + compile 'com.google.guava:guava:15.0' // Crypto compile 'org.factcenter.qilin:qilin:1.2+' diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java index 1663f7a..00bd2ac 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/AsyncBulletinBoardClient.java @@ -1,6 +1,7 @@ package meerkat.bulletinboard; import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.protobuf.Crypto.Signature; import java.util.List; @@ -24,23 +25,48 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient { * @param callback is a class containing methods to handle the result of the operation * @return a unique message ID for the message, that can be later used to retrieve the batch */ - public MessageID postMessage(BulletinBoardMessage msg, ClientCallback callback); + public MessageID postMessage(BulletinBoardMessage msg, ClientCallback callback); /** - * This method allows for sending large messages as a batch to the bulletin board + * Perform an end-to-end post of a signed batch message + * @param completeBatch contains all the data of the batch including the meta-data and the signature + * @param callback is a class containing methods to handle the result of the operation + * @return a unique identifier for the batch message + */ + public MessageID postBatch(CompleteBatch completeBatch, ClientCallback callback); + + /** + * This message informs the server about the existence of a new batch message and supplies it with the tags associated with it * @param signerId is the canonical form for the ID of the sender of this batch * @param batchId is a unique (per signer) ID for this batch - * @param batchDataList is the (canonically ordered) list of data comprising the batch message - * @param startPosition is the location (in the batch) of the first entry in batchDataList (optionally used to continue interrupted post operations) - * @param callback is a callback function class for handling results of the operation - * @return a unique message ID for the entire message, that can be later used to retrieve the batch + * @param tagList is a list of tags that belong to the batch message */ - public MessageID postBatch(byte[] signerId, int batchId, List batchDataList, int startPosition, ClientCallback callback); + public void beginBatch(byte[] signerId, int batchId, List tagList, ClientCallback callback); /** - * Overloading of the postBatch method in which startPosition is set to the default value 0 + * This method posts batch data into an (assumed to be open) batch + * It does not close the batch + * @param signerId is the canonical form for the ID of the sender of this batch + * @param batchId is a unique (per signer) ID for this batch + * @param batchDataList is the (canonically ordered) list of data comprising the entire batch message (not just the portion to be written) + * @param startPosition is the location (in the batch) of the first entry in batchDataList + * (optionally used to continue interrupted post operations) + * @param callback is a callback function class for handling results of the operation */ - public MessageID postBatch(byte[] signerId, int batchId, List batchDataList, ClientCallback callback); + public void postBatchData(byte[] signerId, int batchId, List batchDataList, + int startPosition, ClientCallback callback); + + /** + * Overloading of the postBatchData method in which startPosition is set to the default value 0 + */ + public void postBatchData(byte[] signerId, int batchId, List batchDataList, ClientCallback callback); + + /** + * Attempts to close a batch message + * @param closeBatchMessage contains the data required to close the batch + * @param callback is a callback function class for handling results of the operation + */ + public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback callback); /** * Check how "safe" a given message is in an asynchronous manner