diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientJob.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientJob.java new file mode 100644 index 0000000..b63ca50 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientJob.java @@ -0,0 +1,82 @@ +package meerkat.bulletinboard; + +import com.google.protobuf.Message; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** + * Created by Arbel Deutsch Peled on 09-Dec-15. + * + * This class specifies the job that is required of a Bulletin Board Client Worker + */ +public class BulletinClientJob { + + public static enum JobType{ + POST_MESSAGE, // Post a message to servers + READ_MESSAGES, // Read messages according to some given filter (any server will do) + GET_REDUNDANCY // Check the redundancy of a specific message in the databases + } + + private List serverAddresses; + + private int minServers; // The minimal number of servers the job must be successful on for the job to be completed + + private final JobType jobType; + + private final Message payload; // The information associated with the job type + + private int maxRetry; // Number of retries for this job; set to -1 for infinite retries + + public BulletinClientJob(List serverAddresses, int minServers, JobType jobType, Message payload, int maxRetry) { + this.serverAddresses = serverAddresses; + this.minServers = minServers; + this.jobType = jobType; + this.payload = payload; + this.maxRetry = maxRetry; + } + + public List getServerAddresses() { + return serverAddresses; + } + + public int getMinServers() { + return minServers; + } + + public JobType getJobType() { + return jobType; + } + + public Message getPayload() { + return payload; + } + + public int getMaxRetry() { + return maxRetry; + } + + public Iterator getAddressIterator() { + return serverAddresses.iterator(); + } + + public void shuffleAddresses() { + Collections.shuffle(serverAddresses); + } + + public void decMinServers(){ + minServers--; + } + + public void decMaxRetry(){ + if (maxRetry > 0) { + maxRetry--; + } + } + + public boolean isRetry(){ + return (maxRetry != 0); + } + +} \ No newline at end of file diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientJobResult.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientJobResult.java new file mode 100644 index 0000000..be0501b --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientJobResult.java @@ -0,0 +1,29 @@ +package meerkat.bulletinboard; + +import com.google.protobuf.Message; + +/** + * Created by Arbel Deutsch Peled on 09-Dec-15. + * + * This class contains the end status and result of a Bulletin Board Client Job. + */ +public final class BulletinClientJobResult { + + private final BulletinClientJob job; // Stores the job the result refers to + + private final Message result; // The result of the job; valid only if success==true + + public BulletinClientJobResult(BulletinClientJob job, Message result) { + this.job = job; + this.result = result; + } + + public BulletinClientJob getJob() { + return job; + } + + public Message getResult() { + return result; + } + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientWorker.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientWorker.java new file mode 100644 index 0000000..eb67672 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/BulletinClientWorker.java @@ -0,0 +1,213 @@ +package meerkat.bulletinboard; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; +import meerkat.comm.CommunicationException; +import meerkat.crypto.Digest; +import meerkat.crypto.concrete.SHA256Digest; +import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.rest.Constants; +import meerkat.rest.ProtobufMessageBodyReader; +import meerkat.rest.ProtobufMessageBodyWriter; + +import javax.ws.rs.ProcessingException; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; +import java.util.Iterator; +import java.util.concurrent.Callable; + +/** + * Created by Arbel Deutsch Peled on 09-Dec-15. + * + * This class implements the actual communication with the Bulletin Board Servers. + * It is meant to be used in a multi-threaded environment. + */ +//TODO: Maybe make this abstract and inherit from it. +public class BulletinClientWorker implements Callable { + + private final BulletinClientJob job; // The requested job to be handled + + public BulletinClientWorker(BulletinClientJob job){ + this.job = job; + } + + // This resource enabled creation of a single Client per thread. + private static final ThreadLocal clientLocal = + new ThreadLocal () { + @Override protected Client initialValue() { + Client client; + client = ClientBuilder.newClient(); + client.register(ProtobufMessageBodyReader.class); + client.register(ProtobufMessageBodyWriter.class); + + return client; + } + }; + + // This resource enables creation of a single Digest per thread. + private static final ThreadLocal digestLocal = + new ThreadLocal () { + @Override protected Digest initialValue() { + Digest digest; + digest = new SHA256Digest(); //TODO: Make this generic. + + return digest; + } + }; + + /** + * This method carries out the actual communication with the servers via HTTP Post + * It accesses the servers according to the job it received and updates said job as it goes + * The method will only iterate once through the server list, removing servers from the list when they are no longer required + * In a POST_MESSAGE job: successful post to a server results in removing the server from the list + * In a GET_REDUNDANCY job: no server is removed from the list and the (absolute) number of servers in which the message was found is returned + * In a READ_MESSAGES job: successful retrieval from any server terminates the method and returns the received values; The list is not changed + * @return The original job, modified to fit the current state and the required output (if any) of the operation + * @throws IllegalArgumentException + * @throws CommunicationException + */ + public BulletinClientJobResult call() throws IllegalArgumentException, CommunicationException{ + + Client client = clientLocal.get(); + Digest digest = digestLocal.get(); + + WebTarget webTarget; + Response response; + + job.shuffleAddresses(); // This is done to randomize the order of access to servers primarily for READ operations + + String requestPath; + Message msg; + + BulletinBoardMessageList msgList; + + int count = 0; // Used to count number of servers which contain the required message in a GET_REDUNDANCY request. + + // Prepare the request. + switch(job.getJobType()) { + + case POST_MESSAGE: + // Make sure the payload is a BulletinBoardMessage + if (!(job.getPayload() instanceof BulletinBoardMessage)) { + throw new IllegalArgumentException("Cannot post an object that is not an instance of BulletinBoardMessage"); + } + + msg = job.getPayload(); + requestPath = Constants.POST_MESSAGE_PATH; + break; + + case READ_MESSAGES: + // Make sure the payload is a MessageFilterList + if (!(job.getPayload() instanceof MessageFilterList)) { + throw new IllegalArgumentException("Read failed: an instance of MessageFilterList is required as payload for a READ_MESSAGES operation"); + } + + msg = job.getPayload(); + requestPath = Constants.READ_MESSAGES_PATH; + break; + + case GET_REDUNDANCY: + // Make sure the payload is a BulletinBoardMessage + if (!(job.getPayload() instanceof BulletinBoardMessage)) { + throw new IllegalArgumentException("Cannot search for an object that is not an instance of BulletinBoardMessage"); + } + + requestPath = Constants.READ_MESSAGES_PATH; + + // Create a MsgID from the + digest.update((BulletinBoardMessage) job.getPayload()); + msg = MessageFilterList.newBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.MSG_ID) + .setId(ByteString.copyFrom(digest.digest())) + .build() + ).build(); + + break; + + default: + throw new IllegalArgumentException("Unsupported job type"); + + } + + // Iterate through servers + + Iterator addressIterator = job.getAddressIterator(); + + while (addressIterator.hasNext()) { + + // Send request to Server + String address = addressIterator.next(); + webTarget = client.target(address).path(Constants.BULLETIN_BOARD_SERVER_PATH).path(requestPath); + response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(msg, Constants.MEDIATYPE_PROTOBUF)); + + // Retrieve answer + switch(job.getJobType()) { + + case POST_MESSAGE: + try { + + response.readEntity(BoolMsg.class); // If a BoolMsg entity is returned: the post was successful + addressIterator.remove(); // Post to this server succeeded: remove server from list + job.decMinServers(); + + } catch (ProcessingException | IllegalStateException e) {} // Post to this server failed: retry next time + finally { + response.close(); + } + break; + + case GET_REDUNDANCY: + try { + msgList = response.readEntity(BulletinBoardMessageList.class); // If a BulletinBoardMessageList is returned: the read was successful + + if (msgList.getMessageList().size() > 0){ // Message was found in the server. + count++; + } + } catch (ProcessingException | IllegalStateException e) {} // Read failed: try with next server + finally { + response.close(); + } + break; + + case READ_MESSAGES: + try { + msgList = response.readEntity(BulletinBoardMessageList.class); // If a BulletinBoardMessageList is returned: the read was successful + return new BulletinClientJobResult(job, msgList); // Return the result + } catch (ProcessingException | IllegalStateException e) {} // Read failed: try with next server + finally { + response.close(); + } + break; + + } + + } + + // Return result (if haven't done so yet) + switch(job.getJobType()) { + + case POST_MESSAGE: + // The job now contains the information required to ascertain whether enough server posts have succeeded + // It also contains the list of servers in which the post was not successful + return new BulletinClientJobResult(job, null); + + case GET_REDUNDANCY: + // Return the number of servers in which the message was found + // The job now contains the list of these servers + return new BulletinClientJobResult(job, IntMsg.newBuilder().setValue(count).build()); + + case READ_MESSAGES: + // A successful operation would have already returned an output + // Therefore: no server access was successful + throw new CommunicationException("Could not access any server"); + + default: // This is required for successful compilation + throw new IllegalArgumentException("Unsupported job type"); + + } + } +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java index 9cf6dd4..f340cae 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardClient.java @@ -5,6 +5,8 @@ import meerkat.comm.CommunicationException; import meerkat.crypto.Digest; import meerkat.crypto.concrete.SHA256Digest; import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.protobuf.Voting; +import meerkat.protobuf.Voting.BulletinBoardClientParams; import meerkat.rest.*; import java.util.List; @@ -18,11 +20,7 @@ import javax.ws.rs.core.Response; /** * Created by Arbel Deutsch Peled on 05-Dec-15. */ -public class SimpleBulletinBoardClient implements BulletinBoardClient { - - //TODO: Make this general - private static String SQL_SERVER_POST = "sqlserver/postmessage"; - private static String SQL_SERVER_GET = "sqlserver/readmessages"; +public class SimpleBulletinBoardClient{ //implements BulletinBoardClient { private List meerkatDBs; @@ -32,12 +30,12 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient { /** * Stores database locations and initializes the web Client - * @param meerkatDBs is the list of database locations + * @param clientParams contains the data needed to access the DBs */ - @Override - public void init(List meerkatDBs) { +// @Override + public void init(Voting.BulletinBoardClientParams clientParams) { - this.meerkatDBs = meerkatDBs; + meerkatDBs = clientParams.getBulletinBoardAddressList(); client = ClientBuilder.newClient(); client.register(ProtobufMessageBodyReader.class); @@ -54,7 +52,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient { * @return the message ID for later retrieval * @throws CommunicationException */ - @Override +// @Override public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException { WebTarget webTarget; @@ -63,7 +61,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient { // Post message to all databases try { for (String db : meerkatDBs) { - webTarget = client.target(db).path(SQL_SERVER_POST); + webTarget = client.target(db).path(Constants.BULLETIN_BOARD_SERVER_PATH).path(Constants.POST_MESSAGE_PATH); response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(msg, Constants.MEDIATYPE_PROTOBUF)); // Only consider valid responses @@ -90,7 +88,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient { * @param id is the requested message ID * @return the number of DBs in which retrieval was successful */ - @Override +// @Override public float getRedundancy(MessageID id) { WebTarget webTarget; Response response; @@ -106,7 +104,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient { for (String db : meerkatDBs) { try { - webTarget = client.target(db).path(SQL_SERVER_GET); + webTarget = client.target(db).path(Constants.BULLETIN_BOARD_SERVER_PATH).path(Constants.READ_MESSAGES_PATH); response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(filterList, Constants.MEDIATYPE_PROTOBUF)); @@ -127,7 +125,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient { * @param filterList return only messages that match the filters (null means no filtering). * @return */ - @Override +// @Override public List readMessages(MessageFilterList filterList) { WebTarget webTarget; Response response; @@ -140,11 +138,11 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient { for (String db : meerkatDBs) { try { - webTarget = client.target(db).path(SQL_SERVER_GET); + webTarget = client.target(db).path(Constants.BULLETIN_BOARD_SERVER_PATH).path(Constants.READ_MESSAGES_PATH); response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(filterList, Constants.MEDIATYPE_PROTOBUF)); - messageList =response.readEntity(BulletinBoardMessageList.class); + messageList = response.readEntity(BulletinBoardMessageList.class); if (messageList != null){ return messageList.getMessageList(); @@ -156,8 +154,8 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient { return null; } - @Override - public void registerNewMessageCallback(MessageCallback callback, MessageFilterList filterList) { - callback.handleNewMessage(readMessages(filterList)); - } +// @Override +// public void registerNewMessageCallback(MessageCallback callback, MessageFilterList filterList) { +// callback.handleNewMessage(readMessages(filterList)); +// } } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java new file mode 100644 index 0000000..dd1ab0f --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardClient.java @@ -0,0 +1,118 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.*; +import com.google.protobuf.ByteString; +import meerkat.bulletinboard.callbacks.GetRedundancyFutureCallback; +import meerkat.bulletinboard.callbacks.PostMessageFutureCallback; +import meerkat.bulletinboard.callbacks.ReadMessagesFutureCallback; +import meerkat.comm.CommunicationException; +import meerkat.crypto.Digest; +import meerkat.crypto.concrete.SHA256Digest; +import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.protobuf.Voting; + +import java.util.List; +import java.util.concurrent.Executors; + +/** + * Created by Arbel Deutsch Peled on 05-Dec-15. + * Thread-based implementation of a Bulletin Board Client. + * Features: + * 1. Handles tasks concurrently. + * 2. Retries submitting + */ +public class ThreadedBulletinBoardClient implements BulletinBoardClient { + + private final static int THREAD_NUM = 10; + ListeningExecutorService listeningExecutor; + + private Digest digest; + + private List meerkatDBs; + private String postSubAddress; + private String readSubAddress; + + private final static int READ_MESSAGES_RETRY_NUM = 1; + + private int minAbsoluteRedundancy; + + /** + * Stores database locations and initializes the web Client + * Stores the required minimum redundancy. + * Starts the Thread Pool. + * @param clientParams contains the required information + */ + @Override + public void init(Voting.BulletinBoardClientParams clientParams) { + + meerkatDBs = clientParams.getBulletinBoardAddressList(); + + minAbsoluteRedundancy = (int) (clientParams.getMinRedundancy() * meerkatDBs.size()); + + listeningExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_NUM)); + + digest = new SHA256Digest(); + + } + + /** + * Post message to all DBs + * Retry failed DBs + * @param msg is the message, + * @return the message ID for later retrieval + * @throws CommunicationException + */ + @Override + public MessageID postMessage(BulletinBoardMessage msg, ClientCallback callback){ + + // Create job + BulletinClientJob job = new BulletinClientJob(meerkatDBs, minAbsoluteRedundancy, BulletinClientJob.JobType.POST_MESSAGE, msg, -1); + + // Submit job and create callback + Futures.addCallback(listeningExecutor.submit(new BulletinClientWorker(job)), new PostMessageFutureCallback(listeningExecutor, callback)); + + // Calculate the correct message ID and return it + digest.reset(); + digest.update(msg.getMsg()); + return MessageID.newBuilder().setID(ByteString.copyFrom(digest.digest())).build(); + } + + /** + * Access each database and search for a given message ID + * Return the number of databases in which the message was found + * Only try once per DB + * Ignore communication exceptions in specific databases + * @param id is the requested message ID + * @return the number of DBs in which retrieval was successful + */ + @Override + public void getRedundancy(MessageID id, ClientCallback callback) { + + // Create job + BulletinClientJob job = new BulletinClientJob(meerkatDBs, minAbsoluteRedundancy, BulletinClientJob.JobType.GET_REDUNDANCY, id, 1); + + // Submit job and create callback + Futures.addCallback(listeningExecutor.submit(new BulletinClientWorker(job)), new GetRedundancyFutureCallback(listeningExecutor, callback)); + + } + + /** + * Go through the DBs and try to retrieve messages according to the specified filter + * If at the operation is successful for some DB: return the results and stop iterating + * If no operation is successful: return null (NOT blank list) + * @param filterList return only messages that match the filters (null means no filtering). + * @return + */ + @Override + public void readMessages(MessageFilterList filterList, ClientCallback> callback) { + + // Create job + BulletinClientJob job = new BulletinClientJob(meerkatDBs, minAbsoluteRedundancy, BulletinClientJob.JobType.GET_REDUNDANCY, + filterList, READ_MESSAGES_RETRY_NUM); + + // Submit job and create callback + Futures.addCallback(listeningExecutor.submit(new BulletinClientWorker(job)), new ReadMessagesFutureCallback(listeningExecutor, callback)); + + } + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/ClientFutureCallback.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/ClientFutureCallback.java new file mode 100644 index 0000000..7c5b7b0 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/ClientFutureCallback.java @@ -0,0 +1,25 @@ +package meerkat.bulletinboard.callbacks; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListeningExecutorService; +import meerkat.bulletinboard.BulletinClientJob; +import meerkat.bulletinboard.BulletinClientJobResult; +import meerkat.bulletinboard.BulletinClientWorker; +import meerkat.protobuf.BulletinBoardAPI; + +import java.util.List; + +/** + * This is a future callback used to listen to workers and run on job finish + * Depending on the type of job and the finishing status of the worker: a decision is made whether to retry or return an error + */ +public abstract class ClientFutureCallback implements FutureCallback { + + protected ListeningExecutorService listeningExecutor; + + ClientFutureCallback(ListeningExecutorService listeningExecutor) { + this.listeningExecutor = listeningExecutor; + } + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/GetRedundancyFutureCallback.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/GetRedundancyFutureCallback.java new file mode 100644 index 0000000..518ed77 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/GetRedundancyFutureCallback.java @@ -0,0 +1,38 @@ +package meerkat.bulletinboard.callbacks; + +import com.google.common.util.concurrent.ListeningExecutorService; +import meerkat.bulletinboard.BulletinBoardClient; +import meerkat.bulletinboard.BulletinClientJobResult; +import meerkat.protobuf.BulletinBoardAPI.*; + +import java.util.List; + +/** + * This is a future callback used to listen to workers and run on job finish + * Depending on the type of job and the finishing status of the worker: a decision is made whether to retry or return an error + */ +public class GetRedundancyFutureCallback extends ClientFutureCallback { + + private BulletinBoardClient.ClientCallback callback; + + public GetRedundancyFutureCallback(ListeningExecutorService listeningExecutor, + BulletinBoardClient.ClientCallback callback) { + super(listeningExecutor); + this.callback = callback; + } + + @Override + public void onSuccess(BulletinClientJobResult result) { + + int absoluteRedundancy = ((IntMsg) result.getResult()).getValue(); + int totalServers = result.getJob().getServerAddresses().size(); + + callback.handleCallback( ((float) absoluteRedundancy) / ((float) totalServers) ); + + } + + @Override + public void onFailure(Throwable t) { + callback.handleFailure(t); + } +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/PostMessageFutureCallback.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/PostMessageFutureCallback.java new file mode 100644 index 0000000..7e2a855 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/PostMessageFutureCallback.java @@ -0,0 +1,44 @@ +package meerkat.bulletinboard.callbacks; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListeningExecutorService; +import meerkat.bulletinboard.BulletinBoardClient; +import meerkat.bulletinboard.BulletinClientJob; +import meerkat.bulletinboard.BulletinClientJobResult; +import meerkat.bulletinboard.BulletinClientWorker; +import meerkat.protobuf.BulletinBoardAPI; + +import java.util.List; + +/** + * This is a future callback used to listen to workers and run on job finish + * Depending on the type of job and the finishing status of the worker: a decision is made whether to retry or return an error + */ +public class PostMessageFutureCallback extends ClientFutureCallback { + + private BulletinBoardClient.ClientCallback callback; + + public PostMessageFutureCallback(ListeningExecutorService listeningExecutor, + BulletinBoardClient.ClientCallback callback) { + super(listeningExecutor); + this.callback = callback; + } + + @Override + public void onSuccess(BulletinClientJobResult result) { + + BulletinClientJob job = result.getJob(); + + job.decMaxRetry(); + + // If redundancy is below threshold: retry + if (job.getMinServers() > 0 && job.isRetry()) { + Futures.addCallback(listeningExecutor.submit(new BulletinClientWorker(job)), this); + } + } + + @Override + public void onFailure(Throwable t) { + callback.handleFailure(t); + } +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/ReadMessagesFutureCallback.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/ReadMessagesFutureCallback.java new file mode 100644 index 0000000..4c43ba2 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/callbacks/ReadMessagesFutureCallback.java @@ -0,0 +1,38 @@ +package meerkat.bulletinboard.callbacks; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListeningExecutorService; +import meerkat.bulletinboard.BulletinBoardClient; +import meerkat.bulletinboard.BulletinClientJob; +import meerkat.bulletinboard.BulletinClientJobResult; +import meerkat.bulletinboard.BulletinClientWorker; +import meerkat.protobuf.BulletinBoardAPI; + +import java.util.List; + +/** + * This is a future callback used to listen to workers and run on job finish + * Depending on the type of job and the finishing status of the worker: a decision is made whether to retry or return an error + */ +public class ReadMessagesFutureCallback extends ClientFutureCallback { + + private BulletinBoardClient.ClientCallback> callback; + + public ReadMessagesFutureCallback(ListeningExecutorService listeningExecutor, + BulletinBoardClient.ClientCallback> callback) { + super(listeningExecutor); + this.callback = callback; + } + + @Override + public void onSuccess(BulletinClientJobResult result) { + + callback.handleCallback(((BulletinBoardAPI.BulletinBoardMessageList) result.getResult()).getMessageList()); + + } + + @Override + public void onFailure(Throwable t) { + callback.handleFailure(t); + } +} diff --git a/bulletin-board-server/build.gradle b/bulletin-board-server/build.gradle index 5989499..7adda9d 100644 --- a/bulletin-board-server/build.gradle +++ b/bulletin-board-server/build.gradle @@ -72,7 +72,9 @@ dependencies { test { + exclude '**/*SQLite*Test*' exclude '**/*IntegrationTest*' + outputs.upToDateWhen { false } } task integrationTest(type: Test) { diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java index 5dae671..1a8abaf 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/webapp/BulletinBoardWebApp.java @@ -21,7 +21,7 @@ import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessageList; import meerkat.protobuf.BulletinBoardAPI.MessageFilterList; import meerkat.rest.Constants; -@Path("/sqlserver") +@Path(Constants.BULLETIN_BOARD_SERVER_PATH) public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextListener{ private static final String BULLETIN_BOARD_ATTRIBUTE_NAME = "bulletinBoard"; @@ -63,7 +63,7 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL } } - @Path("postmessage") + @Path(Constants.POST_MESSAGE_PATH) @POST @Consumes(Constants.MEDIATYPE_PROTOBUF) @Produces(Constants.MEDIATYPE_PROTOBUF) @@ -73,7 +73,7 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL return bulletinBoard.postMessage(msg); } - @Path("readmessages") + @Path(Constants.READ_MESSAGES_PATH) @POST @Consumes(Constants.MEDIATYPE_PROTOBUF) @Produces(Constants.MEDIATYPE_PROTOBUF) diff --git a/bulletin-board-server/src/test/java/meerkat/bulletinboard/SQLiteServerIntegrationTest.java b/bulletin-board-server/src/test/java/meerkat/bulletinboard/SQLiteServerIntegrationTest.java index 95c916d..2a4d6d9 100644 --- a/bulletin-board-server/src/test/java/meerkat/bulletinboard/SQLiteServerIntegrationTest.java +++ b/bulletin-board-server/src/test/java/meerkat/bulletinboard/SQLiteServerIntegrationTest.java @@ -24,8 +24,6 @@ public class SQLiteServerIntegrationTest { private static String PROP_GETTY_URL = "gretty.httpBaseURI"; private static String DEFAULT_BASE_URL = "http://localhost:8081"; private static String BASE_URL = System.getProperty(PROP_GETTY_URL, DEFAULT_BASE_URL); - private static String SQL_SERVER_POST = "sqlserver/postmessage"; - private static String SQL_SERVER_GET = "sqlserver/readmessages"; Client client; // Connection connection; @@ -64,11 +62,8 @@ public class SQLiteServerIntegrationTest { // Test writing mechanism - System.err.println("******** Testing: " + SQL_SERVER_POST); - System.err.println(BASE_URL); - System.err.println(SQL_SERVER_POST); - System.err.println(client.getConfiguration()); - webTarget = client.target(BASE_URL).path(SQL_SERVER_POST); + System.err.println("******** Testing: " + Constants.POST_MESSAGE_PATH); + webTarget = client.target(BASE_URL).path(Constants.BULLETIN_BOARD_SERVER_PATH).path(Constants.POST_MESSAGE_PATH); System.err.println(webTarget.getUri()); msg = BulletinBoardMessage.newBuilder() @@ -114,9 +109,8 @@ public class SQLiteServerIntegrationTest { // Test reading mechanism - System.err.println("******** Testing: " + SQL_SERVER_GET); - webTarget = client.target(BASE_URL).path(SQL_SERVER_GET); - + System.err.println("******** Testing: " + Constants.READ_MESSAGES_PATH); + webTarget = client.target(BASE_URL).path(Constants.BULLETIN_BOARD_SERVER_PATH).path(Constants.READ_MESSAGES_PATH); filterList = MessageFilterList.newBuilder() .addFilter( MessageFilter.newBuilder() diff --git a/meerkat-common/build.gradle b/meerkat-common/build.gradle index d2fe0fd..6aa93cb 100644 --- a/meerkat-common/build.gradle +++ b/meerkat-common/build.gradle @@ -45,6 +45,9 @@ dependencies { // Google protobufs compile 'com.google.protobuf:protobuf-java:3.+' + // ListeningExecutor + compile 'com.google.guava:guava:11.0.+' + // Crypto compile 'org.factcenter.qilin:qilin:1.1+' compile 'org.bouncycastle:bcprov-jdk15on:1.53' diff --git a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java index 2e466b3..1577527 100644 --- a/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java +++ b/meerkat-common/src/main/java/meerkat/bulletinboard/BulletinBoardClient.java @@ -1,6 +1,8 @@ package meerkat.bulletinboard; import meerkat.comm.*; +import meerkat.protobuf.Voting.*; + import static meerkat.protobuf.BulletinBoardAPI.*; import java.util.List; @@ -10,24 +12,29 @@ import java.util.List; */ public interface BulletinBoardClient { + interface ClientCallback { + void handleCallback(T msg); + void handleFailure(Throwable t); + } + /** * Initialize the client to use some specified servers - * @param meerkatDBs is the list of database locations + * @param clientParams contains the parameters required for the client setup */ - void init(List meerkatDBs); + void init(BulletinBoardClientParams clientParams); /** * Post a message to the bulletin board * @param msg */ - MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException; + MessageID postMessage(BulletinBoardMessage msg, ClientCallback callback); /** * Check how "safe" a given message is * @param id * @return a normalized "redundancy score" from 0 (local only) to 1 (fully published) */ - float getRedundancy(MessageID id); + void getRedundancy(MessageID id, ClientCallback callback); /** * Read all messages posted matching the given filter @@ -35,20 +42,7 @@ public interface BulletinBoardClient { * set of messages in different calls. However, messages that are fully posted * are guaranteed to be included. * @param filterList return only messages that match the filters (null means no filtering). - * @return */ - List readMessages(MessageFilterList filterList); - - interface MessageCallback { - void handleNewMessage(List msg); - } - - /** - * Register a callback that will be called with each new message that is posted. - * The callback will be called only once for each message. - * @param callback - * @param filterList only call back for messages that match the filter. - */ - void registerNewMessageCallback(MessageCallback callback, MessageFilterList filterList); + void readMessages(MessageFilterList filterList, ClientCallback> callback); } diff --git a/meerkat-common/src/main/proto/meerkat/BulletinBoardAPI.proto b/meerkat-common/src/main/proto/meerkat/BulletinBoardAPI.proto index 28fc948..4830afc 100644 --- a/meerkat-common/src/main/proto/meerkat/BulletinBoardAPI.proto +++ b/meerkat-common/src/main/proto/meerkat/BulletinBoardAPI.proto @@ -10,6 +10,9 @@ message BoolMsg { bool value = 1; } +message IntMsg { + int32 value = 1; +} message MessageID { // The ID of a message for unique retrieval. diff --git a/meerkat-common/src/main/proto/meerkat/voting.proto b/meerkat-common/src/main/proto/meerkat/voting.proto index beb4e0c..9837cce 100644 --- a/meerkat-common/src/main/proto/meerkat/voting.proto +++ b/meerkat-common/src/main/proto/meerkat/voting.proto @@ -52,6 +52,16 @@ message BallotAnswerTranslationTable { bytes data = 1; } +// Data required in order to access the Bulletin Board Servers +message BulletinBoardClientParams { + + // Addresses of all Bulletin Board Servers + repeated string bulletinBoardAddress = 1; + + // Threshold fraction of successful servers posts before a post task is considered complete + float minRedundancy = 2; +} + message ElectionParams { // TODO: different sets of keys for different roles? repeated SignatureVerificationKey trusteeVerificationKeys = 1; @@ -75,4 +85,6 @@ message ElectionParams { // Translation table between answers and plaintext encoding BallotAnswerTranslationTable answerTranslationTable = 7; + // Data required in order to access the Bulletin Board Servers + BulletinBoardClientParams bulletinBoardClientParams = 8; } diff --git a/restful-api-common/src/main/java/meerkat/rest/Constants.java b/restful-api-common/src/main/java/meerkat/rest/Constants.java index 2c04248..73ed7d1 100644 --- a/restful-api-common/src/main/java/meerkat/rest/Constants.java +++ b/restful-api-common/src/main/java/meerkat/rest/Constants.java @@ -5,4 +5,8 @@ package meerkat.rest; */ public interface Constants { public static final String MEDIATYPE_PROTOBUF = "application/x-protobuf"; + + public static final String BULLETIN_BOARD_SERVER_PATH = "/bbserver"; + public static final String READ_MESSAGES_PATH = "/readmessages"; + public static final String POST_MESSAGE_PATH = "/postmessage"; }