Created (untested version of) a Threaded Bulletin Board Client.
Overhauled Bulletin Board Client interface to accommodate this. Deprecated the Simple Bulletin Board Client. Made the path to the server methods generic (defined in the Constants class of the rest package).Bulletin-Board-Client-phase_1
parent
76c5e6681f
commit
3de54f16a2
|
@ -0,0 +1,82 @@
|
|||
package meerkat.bulletinboard;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Created by Arbel Deutsch Peled on 09-Dec-15.
|
||||
*
|
||||
* This class specifies the job that is required of a Bulletin Board Client Worker
|
||||
*/
|
||||
public class BulletinClientJob {
|
||||
|
||||
public static enum JobType{
|
||||
POST_MESSAGE, // Post a message to servers
|
||||
READ_MESSAGES, // Read messages according to some given filter (any server will do)
|
||||
GET_REDUNDANCY // Check the redundancy of a specific message in the databases
|
||||
}
|
||||
|
||||
private List<String> serverAddresses;
|
||||
|
||||
private int minServers; // The minimal number of servers the job must be successful on for the job to be completed
|
||||
|
||||
private final JobType jobType;
|
||||
|
||||
private final Message payload; // The information associated with the job type
|
||||
|
||||
private int maxRetry; // Number of retries for this job; set to -1 for infinite retries
|
||||
|
||||
public BulletinClientJob(List<String> serverAddresses, int minServers, JobType jobType, Message payload, int maxRetry) {
|
||||
this.serverAddresses = serverAddresses;
|
||||
this.minServers = minServers;
|
||||
this.jobType = jobType;
|
||||
this.payload = payload;
|
||||
this.maxRetry = maxRetry;
|
||||
}
|
||||
|
||||
public List<String> getServerAddresses() {
|
||||
return serverAddresses;
|
||||
}
|
||||
|
||||
public int getMinServers() {
|
||||
return minServers;
|
||||
}
|
||||
|
||||
public JobType getJobType() {
|
||||
return jobType;
|
||||
}
|
||||
|
||||
public Message getPayload() {
|
||||
return payload;
|
||||
}
|
||||
|
||||
public int getMaxRetry() {
|
||||
return maxRetry;
|
||||
}
|
||||
|
||||
public Iterator<String> getAddressIterator() {
|
||||
return serverAddresses.iterator();
|
||||
}
|
||||
|
||||
public void shuffleAddresses() {
|
||||
Collections.shuffle(serverAddresses);
|
||||
}
|
||||
|
||||
public void decMinServers(){
|
||||
minServers--;
|
||||
}
|
||||
|
||||
public void decMaxRetry(){
|
||||
if (maxRetry > 0) {
|
||||
maxRetry--;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isRetry(){
|
||||
return (maxRetry != 0);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package meerkat.bulletinboard;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
|
||||
/**
|
||||
* Created by Arbel Deutsch Peled on 09-Dec-15.
|
||||
*
|
||||
* This class contains the end status and result of a Bulletin Board Client Job.
|
||||
*/
|
||||
public final class BulletinClientJobResult {
|
||||
|
||||
private final BulletinClientJob job; // Stores the job the result refers to
|
||||
|
||||
private final Message result; // The result of the job; valid only if success==true
|
||||
|
||||
public BulletinClientJobResult(BulletinClientJob job, Message result) {
|
||||
this.job = job;
|
||||
this.result = result;
|
||||
}
|
||||
|
||||
public BulletinClientJob getJob() {
|
||||
return job;
|
||||
}
|
||||
|
||||
public Message getResult() {
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,213 @@
|
|||
package meerkat.bulletinboard;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Message;
|
||||
import meerkat.comm.CommunicationException;
|
||||
import meerkat.crypto.Digest;
|
||||
import meerkat.crypto.concrete.SHA256Digest;
|
||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||
import meerkat.rest.Constants;
|
||||
import meerkat.rest.ProtobufMessageBodyReader;
|
||||
import meerkat.rest.ProtobufMessageBodyWriter;
|
||||
|
||||
import javax.ws.rs.ProcessingException;
|
||||
import javax.ws.rs.client.Client;
|
||||
import javax.ws.rs.client.ClientBuilder;
|
||||
import javax.ws.rs.client.Entity;
|
||||
import javax.ws.rs.client.WebTarget;
|
||||
import javax.ws.rs.core.Response;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
* Created by Arbel Deutsch Peled on 09-Dec-15.
|
||||
*
|
||||
* This class implements the actual communication with the Bulletin Board Servers.
|
||||
* It is meant to be used in a multi-threaded environment.
|
||||
*/
|
||||
//TODO: Maybe make this abstract and inherit from it.
|
||||
public class BulletinClientWorker implements Callable<BulletinClientJobResult> {
|
||||
|
||||
private final BulletinClientJob job; // The requested job to be handled
|
||||
|
||||
public BulletinClientWorker(BulletinClientJob job){
|
||||
this.job = job;
|
||||
}
|
||||
|
||||
// This resource enabled creation of a single Client per thread.
|
||||
private static final ThreadLocal<Client> clientLocal =
|
||||
new ThreadLocal<Client> () {
|
||||
@Override protected Client initialValue() {
|
||||
Client client;
|
||||
client = ClientBuilder.newClient();
|
||||
client.register(ProtobufMessageBodyReader.class);
|
||||
client.register(ProtobufMessageBodyWriter.class);
|
||||
|
||||
return client;
|
||||
}
|
||||
};
|
||||
|
||||
// This resource enables creation of a single Digest per thread.
|
||||
private static final ThreadLocal<Digest> digestLocal =
|
||||
new ThreadLocal<Digest> () {
|
||||
@Override protected Digest initialValue() {
|
||||
Digest digest;
|
||||
digest = new SHA256Digest(); //TODO: Make this generic.
|
||||
|
||||
return digest;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* This method carries out the actual communication with the servers via HTTP Post
|
||||
* It accesses the servers according to the job it received and updates said job as it goes
|
||||
* The method will only iterate once through the server list, removing servers from the list when they are no longer required
|
||||
* In a POST_MESSAGE job: successful post to a server results in removing the server from the list
|
||||
* In a GET_REDUNDANCY job: no server is removed from the list and the (absolute) number of servers in which the message was found is returned
|
||||
* In a READ_MESSAGES job: successful retrieval from any server terminates the method and returns the received values; The list is not changed
|
||||
* @return The original job, modified to fit the current state and the required output (if any) of the operation
|
||||
* @throws IllegalArgumentException
|
||||
* @throws CommunicationException
|
||||
*/
|
||||
public BulletinClientJobResult call() throws IllegalArgumentException, CommunicationException{
|
||||
|
||||
Client client = clientLocal.get();
|
||||
Digest digest = digestLocal.get();
|
||||
|
||||
WebTarget webTarget;
|
||||
Response response;
|
||||
|
||||
job.shuffleAddresses(); // This is done to randomize the order of access to servers primarily for READ operations
|
||||
|
||||
String requestPath;
|
||||
Message msg;
|
||||
|
||||
BulletinBoardMessageList msgList;
|
||||
|
||||
int count = 0; // Used to count number of servers which contain the required message in a GET_REDUNDANCY request.
|
||||
|
||||
// Prepare the request.
|
||||
switch(job.getJobType()) {
|
||||
|
||||
case POST_MESSAGE:
|
||||
// Make sure the payload is a BulletinBoardMessage
|
||||
if (!(job.getPayload() instanceof BulletinBoardMessage)) {
|
||||
throw new IllegalArgumentException("Cannot post an object that is not an instance of BulletinBoardMessage");
|
||||
}
|
||||
|
||||
msg = job.getPayload();
|
||||
requestPath = Constants.POST_MESSAGE_PATH;
|
||||
break;
|
||||
|
||||
case READ_MESSAGES:
|
||||
// Make sure the payload is a MessageFilterList
|
||||
if (!(job.getPayload() instanceof MessageFilterList)) {
|
||||
throw new IllegalArgumentException("Read failed: an instance of MessageFilterList is required as payload for a READ_MESSAGES operation");
|
||||
}
|
||||
|
||||
msg = job.getPayload();
|
||||
requestPath = Constants.READ_MESSAGES_PATH;
|
||||
break;
|
||||
|
||||
case GET_REDUNDANCY:
|
||||
// Make sure the payload is a BulletinBoardMessage
|
||||
if (!(job.getPayload() instanceof BulletinBoardMessage)) {
|
||||
throw new IllegalArgumentException("Cannot search for an object that is not an instance of BulletinBoardMessage");
|
||||
}
|
||||
|
||||
requestPath = Constants.READ_MESSAGES_PATH;
|
||||
|
||||
// Create a MsgID from the
|
||||
digest.update((BulletinBoardMessage) job.getPayload());
|
||||
msg = MessageFilterList.newBuilder()
|
||||
.addFilter(MessageFilter.newBuilder()
|
||||
.setType(FilterType.MSG_ID)
|
||||
.setId(ByteString.copyFrom(digest.digest()))
|
||||
.build()
|
||||
).build();
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported job type");
|
||||
|
||||
}
|
||||
|
||||
// Iterate through servers
|
||||
|
||||
Iterator<String> addressIterator = job.getAddressIterator();
|
||||
|
||||
while (addressIterator.hasNext()) {
|
||||
|
||||
// Send request to Server
|
||||
String address = addressIterator.next();
|
||||
webTarget = client.target(address).path(Constants.BULLETIN_BOARD_SERVER_PATH).path(requestPath);
|
||||
response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(msg, Constants.MEDIATYPE_PROTOBUF));
|
||||
|
||||
// Retrieve answer
|
||||
switch(job.getJobType()) {
|
||||
|
||||
case POST_MESSAGE:
|
||||
try {
|
||||
|
||||
response.readEntity(BoolMsg.class); // If a BoolMsg entity is returned: the post was successful
|
||||
addressIterator.remove(); // Post to this server succeeded: remove server from list
|
||||
job.decMinServers();
|
||||
|
||||
} catch (ProcessingException | IllegalStateException e) {} // Post to this server failed: retry next time
|
||||
finally {
|
||||
response.close();
|
||||
}
|
||||
break;
|
||||
|
||||
case GET_REDUNDANCY:
|
||||
try {
|
||||
msgList = response.readEntity(BulletinBoardMessageList.class); // If a BulletinBoardMessageList is returned: the read was successful
|
||||
|
||||
if (msgList.getMessageList().size() > 0){ // Message was found in the server.
|
||||
count++;
|
||||
}
|
||||
} catch (ProcessingException | IllegalStateException e) {} // Read failed: try with next server
|
||||
finally {
|
||||
response.close();
|
||||
}
|
||||
break;
|
||||
|
||||
case READ_MESSAGES:
|
||||
try {
|
||||
msgList = response.readEntity(BulletinBoardMessageList.class); // If a BulletinBoardMessageList is returned: the read was successful
|
||||
return new BulletinClientJobResult(job, msgList); // Return the result
|
||||
} catch (ProcessingException | IllegalStateException e) {} // Read failed: try with next server
|
||||
finally {
|
||||
response.close();
|
||||
}
|
||||
break;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Return result (if haven't done so yet)
|
||||
switch(job.getJobType()) {
|
||||
|
||||
case POST_MESSAGE:
|
||||
// The job now contains the information required to ascertain whether enough server posts have succeeded
|
||||
// It also contains the list of servers in which the post was not successful
|
||||
return new BulletinClientJobResult(job, null);
|
||||
|
||||
case GET_REDUNDANCY:
|
||||
// Return the number of servers in which the message was found
|
||||
// The job now contains the list of these servers
|
||||
return new BulletinClientJobResult(job, IntMsg.newBuilder().setValue(count).build());
|
||||
|
||||
case READ_MESSAGES:
|
||||
// A successful operation would have already returned an output
|
||||
// Therefore: no server access was successful
|
||||
throw new CommunicationException("Could not access any server");
|
||||
|
||||
default: // This is required for successful compilation
|
||||
throw new IllegalArgumentException("Unsupported job type");
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -5,6 +5,8 @@ import meerkat.comm.CommunicationException;
|
|||
import meerkat.crypto.Digest;
|
||||
import meerkat.crypto.concrete.SHA256Digest;
|
||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||
import meerkat.protobuf.Voting;
|
||||
import meerkat.protobuf.Voting.BulletinBoardClientParams;
|
||||
import meerkat.rest.*;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -18,11 +20,7 @@ import javax.ws.rs.core.Response;
|
|||
/**
|
||||
* Created by Arbel Deutsch Peled on 05-Dec-15.
|
||||
*/
|
||||
public class SimpleBulletinBoardClient implements BulletinBoardClient {
|
||||
|
||||
//TODO: Make this general
|
||||
private static String SQL_SERVER_POST = "sqlserver/postmessage";
|
||||
private static String SQL_SERVER_GET = "sqlserver/readmessages";
|
||||
public class SimpleBulletinBoardClient{ //implements BulletinBoardClient {
|
||||
|
||||
private List<String> meerkatDBs;
|
||||
|
||||
|
@ -32,12 +30,12 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient {
|
|||
|
||||
/**
|
||||
* Stores database locations and initializes the web Client
|
||||
* @param meerkatDBs is the list of database locations
|
||||
* @param clientParams contains the data needed to access the DBs
|
||||
*/
|
||||
@Override
|
||||
public void init(List<String> meerkatDBs) {
|
||||
// @Override
|
||||
public void init(Voting.BulletinBoardClientParams clientParams) {
|
||||
|
||||
this.meerkatDBs = meerkatDBs;
|
||||
meerkatDBs = clientParams.getBulletinBoardAddressList();
|
||||
|
||||
client = ClientBuilder.newClient();
|
||||
client.register(ProtobufMessageBodyReader.class);
|
||||
|
@ -54,7 +52,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient {
|
|||
* @return the message ID for later retrieval
|
||||
* @throws CommunicationException
|
||||
*/
|
||||
@Override
|
||||
// @Override
|
||||
public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException {
|
||||
|
||||
WebTarget webTarget;
|
||||
|
@ -63,7 +61,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient {
|
|||
// Post message to all databases
|
||||
try {
|
||||
for (String db : meerkatDBs) {
|
||||
webTarget = client.target(db).path(SQL_SERVER_POST);
|
||||
webTarget = client.target(db).path(Constants.BULLETIN_BOARD_SERVER_PATH).path(Constants.POST_MESSAGE_PATH);
|
||||
response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(msg, Constants.MEDIATYPE_PROTOBUF));
|
||||
|
||||
// Only consider valid responses
|
||||
|
@ -90,7 +88,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient {
|
|||
* @param id is the requested message ID
|
||||
* @return the number of DBs in which retrieval was successful
|
||||
*/
|
||||
@Override
|
||||
// @Override
|
||||
public float getRedundancy(MessageID id) {
|
||||
WebTarget webTarget;
|
||||
Response response;
|
||||
|
@ -106,7 +104,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient {
|
|||
|
||||
for (String db : meerkatDBs) {
|
||||
try {
|
||||
webTarget = client.target(db).path(SQL_SERVER_GET);
|
||||
webTarget = client.target(db).path(Constants.BULLETIN_BOARD_SERVER_PATH).path(Constants.READ_MESSAGES_PATH);
|
||||
|
||||
response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(filterList, Constants.MEDIATYPE_PROTOBUF));
|
||||
|
||||
|
@ -127,7 +125,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient {
|
|||
* @param filterList return only messages that match the filters (null means no filtering).
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
// @Override
|
||||
public List<BulletinBoardMessage> readMessages(MessageFilterList filterList) {
|
||||
WebTarget webTarget;
|
||||
Response response;
|
||||
|
@ -140,11 +138,11 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient {
|
|||
|
||||
for (String db : meerkatDBs) {
|
||||
try {
|
||||
webTarget = client.target(db).path(SQL_SERVER_GET);
|
||||
webTarget = client.target(db).path(Constants.BULLETIN_BOARD_SERVER_PATH).path(Constants.READ_MESSAGES_PATH);
|
||||
|
||||
response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(filterList, Constants.MEDIATYPE_PROTOBUF));
|
||||
|
||||
messageList =response.readEntity(BulletinBoardMessageList.class);
|
||||
messageList = response.readEntity(BulletinBoardMessageList.class);
|
||||
|
||||
if (messageList != null){
|
||||
return messageList.getMessageList();
|
||||
|
@ -156,8 +154,8 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerNewMessageCallback(MessageCallback callback, MessageFilterList filterList) {
|
||||
callback.handleNewMessage(readMessages(filterList));
|
||||
}
|
||||
// @Override
|
||||
// public void registerNewMessageCallback(MessageCallback callback, MessageFilterList filterList) {
|
||||
// callback.handleNewMessage(readMessages(filterList));
|
||||
// }
|
||||
}
|
||||
|
|
|
@ -0,0 +1,118 @@
|
|||
package meerkat.bulletinboard;
|
||||
|
||||
import com.google.common.util.concurrent.*;
|
||||
import com.google.protobuf.ByteString;
|
||||
import meerkat.bulletinboard.callbacks.GetRedundancyFutureCallback;
|
||||
import meerkat.bulletinboard.callbacks.PostMessageFutureCallback;
|
||||
import meerkat.bulletinboard.callbacks.ReadMessagesFutureCallback;
|
||||
import meerkat.comm.CommunicationException;
|
||||
import meerkat.crypto.Digest;
|
||||
import meerkat.crypto.concrete.SHA256Digest;
|
||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||
import meerkat.protobuf.Voting;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/**
|
||||
* Created by Arbel Deutsch Peled on 05-Dec-15.
|
||||
* Thread-based implementation of a Bulletin Board Client.
|
||||
* Features:
|
||||
* 1. Handles tasks concurrently.
|
||||
* 2. Retries submitting
|
||||
*/
|
||||
public class ThreadedBulletinBoardClient implements BulletinBoardClient {
|
||||
|
||||
private final static int THREAD_NUM = 10;
|
||||
ListeningExecutorService listeningExecutor;
|
||||
|
||||
private Digest digest;
|
||||
|
||||
private List<String> meerkatDBs;
|
||||
private String postSubAddress;
|
||||
private String readSubAddress;
|
||||
|
||||
private final static int READ_MESSAGES_RETRY_NUM = 1;
|
||||
|
||||
private int minAbsoluteRedundancy;
|
||||
|
||||
/**
|
||||
* Stores database locations and initializes the web Client
|
||||
* Stores the required minimum redundancy.
|
||||
* Starts the Thread Pool.
|
||||
* @param clientParams contains the required information
|
||||
*/
|
||||
@Override
|
||||
public void init(Voting.BulletinBoardClientParams clientParams) {
|
||||
|
||||
meerkatDBs = clientParams.getBulletinBoardAddressList();
|
||||
|
||||
minAbsoluteRedundancy = (int) (clientParams.getMinRedundancy() * meerkatDBs.size());
|
||||
|
||||
listeningExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_NUM));
|
||||
|
||||
digest = new SHA256Digest();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Post message to all DBs
|
||||
* Retry failed DBs
|
||||
* @param msg is the message,
|
||||
* @return the message ID for later retrieval
|
||||
* @throws CommunicationException
|
||||
*/
|
||||
@Override
|
||||
public MessageID postMessage(BulletinBoardMessage msg, ClientCallback<?> callback){
|
||||
|
||||
// Create job
|
||||
BulletinClientJob job = new BulletinClientJob(meerkatDBs, minAbsoluteRedundancy, BulletinClientJob.JobType.POST_MESSAGE, msg, -1);
|
||||
|
||||
// Submit job and create callback
|
||||
Futures.addCallback(listeningExecutor.submit(new BulletinClientWorker(job)), new PostMessageFutureCallback(listeningExecutor, callback));
|
||||
|
||||
// Calculate the correct message ID and return it
|
||||
digest.reset();
|
||||
digest.update(msg.getMsg());
|
||||
return MessageID.newBuilder().setID(ByteString.copyFrom(digest.digest())).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Access each database and search for a given message ID
|
||||
* Return the number of databases in which the message was found
|
||||
* Only try once per DB
|
||||
* Ignore communication exceptions in specific databases
|
||||
* @param id is the requested message ID
|
||||
* @return the number of DBs in which retrieval was successful
|
||||
*/
|
||||
@Override
|
||||
public void getRedundancy(MessageID id, ClientCallback<Float> callback) {
|
||||
|
||||
// Create job
|
||||
BulletinClientJob job = new BulletinClientJob(meerkatDBs, minAbsoluteRedundancy, BulletinClientJob.JobType.GET_REDUNDANCY, id, 1);
|
||||
|
||||
// Submit job and create callback
|
||||
Futures.addCallback(listeningExecutor.submit(new BulletinClientWorker(job)), new GetRedundancyFutureCallback(listeningExecutor, callback));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Go through the DBs and try to retrieve messages according to the specified filter
|
||||
* If at the operation is successful for some DB: return the results and stop iterating
|
||||
* If no operation is successful: return null (NOT blank list)
|
||||
* @param filterList return only messages that match the filters (null means no filtering).
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public void readMessages(MessageFilterList filterList, ClientCallback<List<BulletinBoardMessage>> callback) {
|
||||
|
||||
// Create job
|
||||
BulletinClientJob job = new BulletinClientJob(meerkatDBs, minAbsoluteRedundancy, BulletinClientJob.JobType.GET_REDUNDANCY,
|
||||
filterList, READ_MESSAGES_RETRY_NUM);
|
||||
|
||||
// Submit job and create callback
|
||||
Futures.addCallback(listeningExecutor.submit(new BulletinClientWorker(job)), new ReadMessagesFutureCallback(listeningExecutor, callback));
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
package meerkat.bulletinboard.callbacks;
|
||||
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import meerkat.bulletinboard.BulletinClientJob;
|
||||
import meerkat.bulletinboard.BulletinClientJobResult;
|
||||
import meerkat.bulletinboard.BulletinClientWorker;
|
||||
import meerkat.protobuf.BulletinBoardAPI;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* This is a future callback used to listen to workers and run on job finish
|
||||
* Depending on the type of job and the finishing status of the worker: a decision is made whether to retry or return an error
|
||||
*/
|
||||
public abstract class ClientFutureCallback implements FutureCallback<BulletinClientJobResult> {
|
||||
|
||||
protected ListeningExecutorService listeningExecutor;
|
||||
|
||||
ClientFutureCallback(ListeningExecutorService listeningExecutor) {
|
||||
this.listeningExecutor = listeningExecutor;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package meerkat.bulletinboard.callbacks;
|
||||
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import meerkat.bulletinboard.BulletinBoardClient;
|
||||
import meerkat.bulletinboard.BulletinClientJobResult;
|
||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* This is a future callback used to listen to workers and run on job finish
|
||||
* Depending on the type of job and the finishing status of the worker: a decision is made whether to retry or return an error
|
||||
*/
|
||||
public class GetRedundancyFutureCallback extends ClientFutureCallback {
|
||||
|
||||
private BulletinBoardClient.ClientCallback<Float> callback;
|
||||
|
||||
public GetRedundancyFutureCallback(ListeningExecutorService listeningExecutor,
|
||||
BulletinBoardClient.ClientCallback<Float> callback) {
|
||||
super(listeningExecutor);
|
||||
this.callback = callback;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(BulletinClientJobResult result) {
|
||||
|
||||
int absoluteRedundancy = ((IntMsg) result.getResult()).getValue();
|
||||
int totalServers = result.getJob().getServerAddresses().size();
|
||||
|
||||
callback.handleCallback( ((float) absoluteRedundancy) / ((float) totalServers) );
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
callback.handleFailure(t);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
package meerkat.bulletinboard.callbacks;
|
||||
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import meerkat.bulletinboard.BulletinBoardClient;
|
||||
import meerkat.bulletinboard.BulletinClientJob;
|
||||
import meerkat.bulletinboard.BulletinClientJobResult;
|
||||
import meerkat.bulletinboard.BulletinClientWorker;
|
||||
import meerkat.protobuf.BulletinBoardAPI;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* This is a future callback used to listen to workers and run on job finish
|
||||
* Depending on the type of job and the finishing status of the worker: a decision is made whether to retry or return an error
|
||||
*/
|
||||
public class PostMessageFutureCallback extends ClientFutureCallback {
|
||||
|
||||
private BulletinBoardClient.ClientCallback<?> callback;
|
||||
|
||||
public PostMessageFutureCallback(ListeningExecutorService listeningExecutor,
|
||||
BulletinBoardClient.ClientCallback<?> callback) {
|
||||
super(listeningExecutor);
|
||||
this.callback = callback;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(BulletinClientJobResult result) {
|
||||
|
||||
BulletinClientJob job = result.getJob();
|
||||
|
||||
job.decMaxRetry();
|
||||
|
||||
// If redundancy is below threshold: retry
|
||||
if (job.getMinServers() > 0 && job.isRetry()) {
|
||||
Futures.addCallback(listeningExecutor.submit(new BulletinClientWorker(job)), this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
callback.handleFailure(t);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package meerkat.bulletinboard.callbacks;
|
||||
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import meerkat.bulletinboard.BulletinBoardClient;
|
||||
import meerkat.bulletinboard.BulletinClientJob;
|
||||
import meerkat.bulletinboard.BulletinClientJobResult;
|
||||
import meerkat.bulletinboard.BulletinClientWorker;
|
||||
import meerkat.protobuf.BulletinBoardAPI;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* This is a future callback used to listen to workers and run on job finish
|
||||
* Depending on the type of job and the finishing status of the worker: a decision is made whether to retry or return an error
|
||||
*/
|
||||
public class ReadMessagesFutureCallback extends ClientFutureCallback {
|
||||
|
||||
private BulletinBoardClient.ClientCallback<List<BulletinBoardAPI.BulletinBoardMessage>> callback;
|
||||
|
||||
public ReadMessagesFutureCallback(ListeningExecutorService listeningExecutor,
|
||||
BulletinBoardClient.ClientCallback<List<BulletinBoardAPI.BulletinBoardMessage>> callback) {
|
||||
super(listeningExecutor);
|
||||
this.callback = callback;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(BulletinClientJobResult result) {
|
||||
|
||||
callback.handleCallback(((BulletinBoardAPI.BulletinBoardMessageList) result.getResult()).getMessageList());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
callback.handleFailure(t);
|
||||
}
|
||||
}
|
|
@ -72,7 +72,9 @@ dependencies {
|
|||
|
||||
|
||||
test {
|
||||
exclude '**/*SQLite*Test*'
|
||||
exclude '**/*IntegrationTest*'
|
||||
outputs.upToDateWhen { false }
|
||||
}
|
||||
|
||||
task integrationTest(type: Test) {
|
||||
|
|
|
@ -21,7 +21,7 @@ import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessageList;
|
|||
import meerkat.protobuf.BulletinBoardAPI.MessageFilterList;
|
||||
import meerkat.rest.Constants;
|
||||
|
||||
@Path("/sqlserver")
|
||||
@Path(Constants.BULLETIN_BOARD_SERVER_PATH)
|
||||
public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextListener{
|
||||
|
||||
private static final String BULLETIN_BOARD_ATTRIBUTE_NAME = "bulletinBoard";
|
||||
|
@ -63,7 +63,7 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL
|
|||
}
|
||||
}
|
||||
|
||||
@Path("postmessage")
|
||||
@Path(Constants.POST_MESSAGE_PATH)
|
||||
@POST
|
||||
@Consumes(Constants.MEDIATYPE_PROTOBUF)
|
||||
@Produces(Constants.MEDIATYPE_PROTOBUF)
|
||||
|
@ -73,7 +73,7 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL
|
|||
return bulletinBoard.postMessage(msg);
|
||||
}
|
||||
|
||||
@Path("readmessages")
|
||||
@Path(Constants.READ_MESSAGES_PATH)
|
||||
@POST
|
||||
@Consumes(Constants.MEDIATYPE_PROTOBUF)
|
||||
@Produces(Constants.MEDIATYPE_PROTOBUF)
|
||||
|
|
|
@ -24,8 +24,6 @@ public class SQLiteServerIntegrationTest {
|
|||
private static String PROP_GETTY_URL = "gretty.httpBaseURI";
|
||||
private static String DEFAULT_BASE_URL = "http://localhost:8081";
|
||||
private static String BASE_URL = System.getProperty(PROP_GETTY_URL, DEFAULT_BASE_URL);
|
||||
private static String SQL_SERVER_POST = "sqlserver/postmessage";
|
||||
private static String SQL_SERVER_GET = "sqlserver/readmessages";
|
||||
|
||||
Client client;
|
||||
// Connection connection;
|
||||
|
@ -64,11 +62,8 @@ public class SQLiteServerIntegrationTest {
|
|||
|
||||
// Test writing mechanism
|
||||
|
||||
System.err.println("******** Testing: " + SQL_SERVER_POST);
|
||||
System.err.println(BASE_URL);
|
||||
System.err.println(SQL_SERVER_POST);
|
||||
System.err.println(client.getConfiguration());
|
||||
webTarget = client.target(BASE_URL).path(SQL_SERVER_POST);
|
||||
System.err.println("******** Testing: " + Constants.POST_MESSAGE_PATH);
|
||||
webTarget = client.target(BASE_URL).path(Constants.BULLETIN_BOARD_SERVER_PATH).path(Constants.POST_MESSAGE_PATH);
|
||||
System.err.println(webTarget.getUri());
|
||||
|
||||
msg = BulletinBoardMessage.newBuilder()
|
||||
|
@ -114,9 +109,8 @@ public class SQLiteServerIntegrationTest {
|
|||
|
||||
// Test reading mechanism
|
||||
|
||||
System.err.println("******** Testing: " + SQL_SERVER_GET);
|
||||
webTarget = client.target(BASE_URL).path(SQL_SERVER_GET);
|
||||
|
||||
System.err.println("******** Testing: " + Constants.READ_MESSAGES_PATH);
|
||||
webTarget = client.target(BASE_URL).path(Constants.BULLETIN_BOARD_SERVER_PATH).path(Constants.READ_MESSAGES_PATH);
|
||||
filterList = MessageFilterList.newBuilder()
|
||||
.addFilter(
|
||||
MessageFilter.newBuilder()
|
||||
|
|
|
@ -45,6 +45,9 @@ dependencies {
|
|||
// Google protobufs
|
||||
compile 'com.google.protobuf:protobuf-java:3.+'
|
||||
|
||||
// ListeningExecutor
|
||||
compile 'com.google.guava:guava:11.0.+'
|
||||
|
||||
// Crypto
|
||||
compile 'org.factcenter.qilin:qilin:1.1+'
|
||||
compile 'org.bouncycastle:bcprov-jdk15on:1.53'
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package meerkat.bulletinboard;
|
||||
|
||||
import meerkat.comm.*;
|
||||
import meerkat.protobuf.Voting.*;
|
||||
|
||||
import static meerkat.protobuf.BulletinBoardAPI.*;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -10,24 +12,29 @@ import java.util.List;
|
|||
*/
|
||||
public interface BulletinBoardClient {
|
||||
|
||||
interface ClientCallback<T> {
|
||||
void handleCallback(T msg);
|
||||
void handleFailure(Throwable t);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the client to use some specified servers
|
||||
* @param meerkatDBs is the list of database locations
|
||||
* @param clientParams contains the parameters required for the client setup
|
||||
*/
|
||||
void init(List<String> meerkatDBs);
|
||||
void init(BulletinBoardClientParams clientParams);
|
||||
|
||||
/**
|
||||
* Post a message to the bulletin board
|
||||
* @param msg
|
||||
*/
|
||||
MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException;
|
||||
MessageID postMessage(BulletinBoardMessage msg, ClientCallback<?> callback);
|
||||
|
||||
/**
|
||||
* Check how "safe" a given message is
|
||||
* @param id
|
||||
* @return a normalized "redundancy score" from 0 (local only) to 1 (fully published)
|
||||
*/
|
||||
float getRedundancy(MessageID id);
|
||||
void getRedundancy(MessageID id, ClientCallback<Float> callback);
|
||||
|
||||
/**
|
||||
* Read all messages posted matching the given filter
|
||||
|
@ -35,20 +42,7 @@ public interface BulletinBoardClient {
|
|||
* set of messages in different calls. However, messages that are fully posted
|
||||
* are guaranteed to be included.
|
||||
* @param filterList return only messages that match the filters (null means no filtering).
|
||||
* @return
|
||||
*/
|
||||
List<BulletinBoardMessage> readMessages(MessageFilterList filterList);
|
||||
|
||||
interface MessageCallback {
|
||||
void handleNewMessage(List<BulletinBoardMessage> msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a callback that will be called with each new message that is posted.
|
||||
* The callback will be called only once for each message.
|
||||
* @param callback
|
||||
* @param filterList only call back for messages that match the filter.
|
||||
*/
|
||||
void registerNewMessageCallback(MessageCallback callback, MessageFilterList filterList);
|
||||
void readMessages(MessageFilterList filterList, ClientCallback<List<BulletinBoardMessage>> callback);
|
||||
|
||||
}
|
||||
|
|
|
@ -10,6 +10,9 @@ message BoolMsg {
|
|||
bool value = 1;
|
||||
}
|
||||
|
||||
message IntMsg {
|
||||
int32 value = 1;
|
||||
}
|
||||
|
||||
message MessageID {
|
||||
// The ID of a message for unique retrieval.
|
||||
|
|
|
@ -52,6 +52,16 @@ message BallotAnswerTranslationTable {
|
|||
bytes data = 1;
|
||||
}
|
||||
|
||||
// Data required in order to access the Bulletin Board Servers
|
||||
message BulletinBoardClientParams {
|
||||
|
||||
// Addresses of all Bulletin Board Servers
|
||||
repeated string bulletinBoardAddress = 1;
|
||||
|
||||
// Threshold fraction of successful servers posts before a post task is considered complete
|
||||
float minRedundancy = 2;
|
||||
}
|
||||
|
||||
message ElectionParams {
|
||||
// TODO: different sets of keys for different roles?
|
||||
repeated SignatureVerificationKey trusteeVerificationKeys = 1;
|
||||
|
@ -75,4 +85,6 @@ message ElectionParams {
|
|||
// Translation table between answers and plaintext encoding
|
||||
BallotAnswerTranslationTable answerTranslationTable = 7;
|
||||
|
||||
// Data required in order to access the Bulletin Board Servers
|
||||
BulletinBoardClientParams bulletinBoardClientParams = 8;
|
||||
}
|
||||
|
|
|
@ -5,4 +5,8 @@ package meerkat.rest;
|
|||
*/
|
||||
public interface Constants {
|
||||
public static final String MEDIATYPE_PROTOBUF = "application/x-protobuf";
|
||||
|
||||
public static final String BULLETIN_BOARD_SERVER_PATH = "/bbserver";
|
||||
public static final String READ_MESSAGES_PATH = "/readmessages";
|
||||
public static final String POST_MESSAGE_PATH = "/postmessage";
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue