Dual-layered threaded BB Client.

Supports basic functionality.
Does not support Batch Messages yet.
Bulletin-Board-Batch
Arbel Deutsch Peled 2016-01-17 10:59:05 +02:00
parent d643932ef9
commit 141d286af2
21 changed files with 927 additions and 486 deletions

View File

@ -1,82 +0,0 @@
package meerkat.bulletinboard;
import com.google.protobuf.Message;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
/**
* Created by Arbel Deutsch Peled on 09-Dec-15.
*
* This class specifies the job that is required of a Bulletin Board Client Worker
*/
public class BulletinClientJob {
public static enum JobType{
POST_MESSAGE, // Post a message to servers
READ_MESSAGES, // Read messages according to some given filter (any server will do)
GET_REDUNDANCY // Check the redundancy of a specific message in the databases
}
private List<String> serverAddresses;
private int minServers; // The minimal number of servers the job must be successful on for the job to be completed
private final JobType jobType;
private final Message payload; // The information associated with the job type
private int maxRetry; // Number of retries for this job; set to -1 for infinite retries
public BulletinClientJob(List<String> serverAddresses, int minServers, JobType jobType, Message payload, int maxRetry) {
this.serverAddresses = serverAddresses;
this.minServers = minServers;
this.jobType = jobType;
this.payload = payload;
this.maxRetry = maxRetry;
}
public void updateServerAddresses(List<String> newServerAdresses) {
this.serverAddresses = newServerAdresses;
}
public List<String> getServerAddresses() {
return serverAddresses;
}
public int getMinServers() {
return minServers;
}
public JobType getJobType() {
return jobType;
}
public Message getPayload() {
return payload;
}
public int getMaxRetry() {
return maxRetry;
}
public void shuffleAddresses() {
Collections.shuffle(serverAddresses);
}
public void decMinServers(){
minServers--;
}
public void decMaxRetry(){
if (maxRetry > 0) {
maxRetry--;
}
}
public boolean isRetry(){
return (maxRetry != 0);
}
}

View File

@ -1,29 +0,0 @@
package meerkat.bulletinboard;
import com.google.protobuf.Message;
/**
* Created by Arbel Deutsch Peled on 09-Dec-15.
*
* This class contains the end status and result of a Bulletin Board Client Job.
*/
public final class BulletinClientJobResult {
private final BulletinClientJob job; // Stores the job the result refers to
private final Message result; // The result of the job; valid only if success==true
public BulletinClientJobResult(BulletinClientJob job, Message result) {
this.job = job;
this.result = result;
}
public BulletinClientJob getJob() {
return job;
}
public Message getResult() {
return result;
}
}

View File

@ -1,218 +1,38 @@
package meerkat.bulletinboard;
import com.google.protobuf.Message;
import meerkat.comm.CommunicationException;
import meerkat.crypto.Digest;
import meerkat.crypto.concrete.SHA256Digest;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.rest.Constants;
import meerkat.rest.ProtobufMessageBodyReader;
import meerkat.rest.ProtobufMessageBodyWriter;
import static meerkat.bulletinboard.BulletinBoardConstants.*;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
/**
* Created by Arbel Deutsch Peled on 09-Dec-15.
*
* This class implements the actual communication with the Bulletin Board Servers.
* This class handles bulletin client work.
* It is meant to be used in a multi-threaded environment.
*/
//TODO: Maybe make this abstract and inherit from it.
public class BulletinClientWorker implements Callable<BulletinClientJobResult> {
public abstract class BulletinClientWorker<IN> {
private final BulletinClientJob job; // The requested job to be handled
protected IN payload; // Payload of the job
public BulletinClientWorker(BulletinClientJob job){
this.job = job;
private int maxRetry; // Number of retries for this job; set to -1 for infinite retries
public BulletinClientWorker(IN payload, int maxRetry) {
this.payload = payload;
this.maxRetry = maxRetry;
}
// This resource enabled creation of a single Client per thread.
private static final ThreadLocal<Client> clientLocal =
new ThreadLocal<Client> () {
@Override protected Client initialValue() {
Client client;
client = ClientBuilder.newClient();
client.register(ProtobufMessageBodyReader.class);
client.register(ProtobufMessageBodyWriter.class);
public IN getPayload() {
return payload;
}
return client;
}
};
// This resource enables creation of a single Digest per thread.
private static final ThreadLocal<Digest> digestLocal =
new ThreadLocal<Digest> () {
@Override protected Digest initialValue() {
Digest digest;
digest = new SHA256Digest(); //TODO: Make this generic.
return digest;
}
};
/**
* This method carries out the actual communication with the servers via HTTP Post
* It accesses the servers according to the job it received and updates said job as it goes
* The method will only iterate once through the server list, removing servers from the list when they are no longer required
* In a POST_MESSAGE job: successful post to a server results in removing the server from the list
* In a GET_REDUNDANCY job: no server is removed from the list and the (absolute) number of servers in which the message was found is returned
* In a READ_MESSAGES job: successful retrieval from any server terminates the method and returns the received values; The list is not changed
* @return The original job, modified to fit the current state and the required output (if any) of the operation
* @throws IllegalArgumentException
* @throws CommunicationException
*/
public BulletinClientJobResult call() throws IllegalArgumentException, CommunicationException{
Client client = clientLocal.get();
Digest digest = digestLocal.get();
WebTarget webTarget;
Response response;
String requestPath;
Message msg;
List<String> serverAddresses = new LinkedList<String>(job.getServerAddresses());
Message payload = job.getPayload();
BulletinBoardMessageList msgList;
int count = 0; // Used to count number of servers which contain the required message in a GET_REDUNDANCY request.
job.shuffleAddresses(); // This is done to randomize the order of access to servers primarily for READ operations
// Prepare the request.
switch(job.getJobType()) {
case POST_MESSAGE:
// Make sure the payload is a BulletinBoardMessage
if (!(payload instanceof BulletinBoardMessage)) {
throw new IllegalArgumentException("Cannot post an object that is not an instance of BulletinBoardMessage");
}
msg = payload;
requestPath = POST_MESSAGE_PATH;
break;
case READ_MESSAGES:
// Make sure the payload is a MessageFilterList
if (!(payload instanceof MessageFilterList)) {
throw new IllegalArgumentException("Read failed: an instance of MessageFilterList is required as payload for a READ_MESSAGES operation");
}
msg = payload;
requestPath = READ_MESSAGES_PATH;
break;
case GET_REDUNDANCY:
// Make sure the payload is a MessageId
if (!(payload instanceof MessageID)) {
throw new IllegalArgumentException("Cannot search for an object that is not an instance of MessageID");
}
requestPath = READ_MESSAGES_PATH;
msg = MessageFilterList.newBuilder()
.addFilter(MessageFilter.newBuilder()
.setType(FilterType.MSG_ID)
.setId(((MessageID) payload).getID())
.build()
).build();
break;
default:
throw new IllegalArgumentException("Unsupported job type");
}
// Iterate through servers
Iterator<String> addressIterator = serverAddresses.iterator();
while (addressIterator.hasNext()) {
// Send request to Server
String address = addressIterator.next();
webTarget = client.target(address).path(BULLETIN_BOARD_SERVER_PATH).path(requestPath);
response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(msg, Constants.MEDIATYPE_PROTOBUF));
// Retrieve answer
switch(job.getJobType()) {
case POST_MESSAGE:
try {
response.readEntity(BoolMsg.class); // If a BoolMsg entity is returned: the post was successful
addressIterator.remove(); // Post to this server succeeded: remove server from list
job.decMinServers();
} catch (ProcessingException | IllegalStateException e) {} // Post to this server failed: retry next time
finally {
response.close();
}
break;
case GET_REDUNDANCY:
try {
msgList = response.readEntity(BulletinBoardMessageList.class); // If a BulletinBoardMessageList is returned: the read was successful
if (msgList.getMessageList().size() > 0){ // Message was found in the server.
count++;
}
} catch (ProcessingException | IllegalStateException e) {} // Read failed: try with next server
finally {
response.close();
}
break;
case READ_MESSAGES:
try {
msgList = response.readEntity(BulletinBoardMessageList.class); // If a BulletinBoardMessageList is returned: the read was successful
return new BulletinClientJobResult(job, msgList); // Return the result
} catch (ProcessingException | IllegalStateException e) {} // Read failed: try with next server
finally {
response.close();
}
break;
}
}
// Return result (if haven't done so yet)
switch(job.getJobType()) {
case POST_MESSAGE:
// The job now contains the information required to ascertain whether enough server posts have succeeded
// It will also contain the list of servers in which the post was not successful
job.updateServerAddresses(serverAddresses);
return new BulletinClientJobResult(job, null);
case GET_REDUNDANCY:
// Return the number of servers in which the message was found
// The job now contains the list of these servers
return new BulletinClientJobResult(job, IntMsg.newBuilder().setValue(count).build());
case READ_MESSAGES:
// A successful operation would have already returned an output
// Therefore: no server access was successful
throw new CommunicationException("Could not access any server");
default: // This is required for successful compilation
throw new IllegalArgumentException("Unsupported job type");
public int getMaxRetry() {
return maxRetry;
}
public void decMaxRetry(){
if (maxRetry > 0) {
maxRetry--;
}
}
public boolean isRetry(){
return (maxRetry != 0);
}
}

View File

@ -0,0 +1,104 @@
package meerkat.bulletinboard;
import com.google.common.util.concurrent.FutureCallback;
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by Arbel Deutsch Peled on 09-Dec-15.
*
* This is a general class for handling multi-server work
* It utilizes Single Server Clients to perform the actual per-server work
*/
public abstract class MultiServerWorker<IN, OUT> extends BulletinClientWorker<IN> implements Runnable, ClientCallback<OUT>{
private List<SingleServerBulletinBoardClient> clients;
protected AtomicInteger minServers; // The minimal number of servers the job must be successful on for the job to be completed
protected AtomicInteger maxFailedServers; // The maximal number of allowed server failures
private AtomicBoolean returnedResult;
private ClientCallback<OUT> clientCallback;
/**
* Constructor
* @param clients contains a list of Single Server clients to handle requests
* @param shuffleClients is a boolean stating whether or not it is needed to shuffle the clients
* @param minServers is the minimal amount of servers needed in order to successfully complete the job
* @param payload is the payload for the job
* @param maxRetry is the maximal per-server retry count
* @param clientCallback contains the callback methods used to report the result back to the client
*/
public MultiServerWorker(List<SingleServerBulletinBoardClient> clients, boolean shuffleClients,
int minServers, IN payload, int maxRetry,
ClientCallback<OUT> clientCallback) {
super(payload,maxRetry);
this.clients = clients;
if (shuffleClients){
Collections.shuffle(clients);
}
this.minServers = new AtomicInteger(minServers);
maxFailedServers = new AtomicInteger(clients.size() - minServers);
this.clientCallback = clientCallback;
returnedResult = new AtomicBoolean(false);
}
/**
* Constructor overload without client shuffling
*/
public MultiServerWorker(List<SingleServerBulletinBoardClient> clients,
int minServers, IN payload, int maxRetry,
ClientCallback<OUT> clientCallback) {
this(clients, false, minServers, payload, maxRetry, clientCallback);
}
/**
* Used to report a successful operation to the client
* Only reports once to the client
* @param result is the result
*/
protected void succeed(OUT result){
if (returnedResult.compareAndSet(false, true)) {
clientCallback.handleCallback(result);
}
}
/**
* Used to report a failed operation to the client
* Only reports once to the client
* @param t contains the error/exception that occurred
*/
protected void fail(Throwable t){
if (returnedResult.compareAndSet(false, true)) {
clientCallback.handleFailure(t);
}
}
/**
* Used by implementations to get a Single Server Client iterator
* @return the requested iterator
*/
protected Iterator<SingleServerBulletinBoardClient> getClientIterator() {
return clients.iterator();
}
protected int getClientNumber() {
return clients.size();
}
}

View File

@ -0,0 +1,228 @@
package meerkat.bulletinboard;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import meerkat.bulletinboard.workers.SingleServerGetRedundancyWorker;
import meerkat.bulletinboard.workers.SingleServerPostMessageWorker;
import meerkat.bulletinboard.workers.SingleServerReadMessagesWorker;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Voting.BulletinBoardClientParams;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Created by Arbel Deutsch Peled on 28-Dec-15.
*
* This class implements the asynchronous Bulletin Board Client interface
* It only handles a single Bulletin Board Server
* If the list of servers contains more than one server: the server actually used is the first one
* The class further implements a delayed access to the server after a communication error occurs
*/
public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient implements AsyncBulletinBoardClient {
private final int MAX_RETRIES = 10;
protected ListeningScheduledExecutorService executorService;
private long lastServerErrorTime;
protected final long failDelayInMilliseconds;
/**
* Notify the client that a job has failed
* This makes new scheduled jobs be scheduled for a later time (after the given delay)
*/
protected void fail() {
// Update last fail time
lastServerErrorTime = System.currentTimeMillis();
}
/**
* This method adds a worker to the scheduled queue of the threadpool
* If the server is in an accessible state: the job is submitted for immediate handling
* If the server is not accessible: the job is scheduled for a later time
* @param worker is the worker that should be scheduled for work
* @param callback is the class containing callbacks for handling job completion/failure
*/
protected void scheduleWorker(SingleServerWorker worker, FutureCallback callback){
long timeSinceLastServerError = System.currentTimeMillis() - lastServerErrorTime;
if (timeSinceLastServerError >= failDelayInMilliseconds) {
// Schedule for immediate processing
Futures.addCallback(executorService.submit(worker), callback);
} else {
// Schedule for processing immediately following delay expiry
Futures.addCallback(executorService.schedule(
worker,
failDelayInMilliseconds - timeSinceLastServerError,
TimeUnit.MILLISECONDS),
callback);
}
}
/**
* Inner class for handling simple operation results and retrying if needed
*/
class RetryCallback<T> implements FutureCallback<T> {
private SingleServerWorker worker;
private ClientCallback<T> clientCallback;
public RetryCallback(SingleServerWorker worker, ClientCallback<T> clientCallback) {
this.worker = worker;
this.clientCallback = clientCallback;
}
@Override
public void onSuccess(T result) {
clientCallback.handleCallback(result);
}
@Override
public void onFailure(Throwable t) {
// Notify client about failure
fail();
// Check if another attempt should be made
worker.decMaxRetry();
if (worker.isRetry()) {
// Perform another attempt
scheduleWorker(worker, this);
} else {
// No more retries: notify caller about failure
clientCallback.handleFailure(t);
}
}
}
public SingleServerBulletinBoardClient(int threadPoolSize, long failDelayInMilliseconds) {
executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadPoolSize));
this.failDelayInMilliseconds = failDelayInMilliseconds;
// Set server error time to a time sufficiently in the past to make new jobs go through
lastServerErrorTime = System.currentTimeMillis() - failDelayInMilliseconds;
}
/**
* Stores database location, initializes the web Client and
* @param clientParams contains the data needed to access the DBs
*/
@Override
public void init(BulletinBoardClientParams clientParams) {
// Perform usual setup
super.init(clientParams);
// Remove all but first DB address
String dbAddress = meerkatDBs.get(0);
meerkatDBs = new LinkedList<String>();
meerkatDBs.add(dbAddress);
}
@Override
public MessageID postMessage(BulletinBoardMessage msg, ClientCallback<Boolean> callback) {
// Create worker with redundancy 1 and MAX_RETRIES retries
SingleServerPostMessageWorker worker = new SingleServerPostMessageWorker(meerkatDBs.get(0), msg, MAX_RETRIES);
// Submit worker and create callback
scheduleWorker(worker, new RetryCallback(worker, callback));
// Calculate the correct message ID and return it
digest.reset();
digest.update(msg.getMsg());
return MessageID.newBuilder().setID(ByteString.copyFrom(digest.digest())).build();
}
@Override
public MessageID postBatch(CompleteBatch completeBatch, ClientCallback<Boolean> callback) {
return null;
}
@Override
public void beginBatch(byte[] signerId, int batchId, List<String> tagList, ClientCallback<Boolean> callback) {
}
@Override
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList,
int startPosition, ClientCallback<Boolean> callback) {
}
@Override
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, ClientCallback<Boolean> callback) {
}
@Override
public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback<?> callback) {
}
@Override
public void getRedundancy(MessageID id, ClientCallback<Float> callback) {
// Create worker with no retries
SingleServerGetRedundancyWorker worker = new SingleServerGetRedundancyWorker(meerkatDBs.get(0), id, 1);
// Submit job and create callback
scheduleWorker(worker, new RetryCallback(worker, callback));
}
@Override
public void readMessages(MessageFilterList filterList, ClientCallback<List<BulletinBoardMessage>> callback) {
// Create job with no retries
SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(meerkatDBs.get(0), filterList, 1);
// Submit job and create callback
scheduleWorker(worker, new RetryCallback(worker, callback));
}
@Override
public void readBatch(byte[] signerId, int batchId, ClientCallback<CompleteBatch> callback) {
}
@Override
public void subscribe(MessageFilterList filterList, MessageHandler messageHandler) {
}
@Override
public void close() {
super.close();
executorService.shutdown();
}
}

View File

@ -0,0 +1,39 @@
package meerkat.bulletinboard;
import meerkat.rest.ProtobufMessageBodyReader;
import meerkat.rest.ProtobufMessageBodyWriter;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import java.util.concurrent.Callable;
/**
* Created by Arbel Deutsch Peled on 02-Jan-16.
*/
public abstract class SingleServerWorker<IN, OUT> extends BulletinClientWorker<IN> implements Callable<OUT>{
// This resource enabled creation of a single Client per thread.
protected static final ThreadLocal<Client> clientLocal =
new ThreadLocal<Client> () {
@Override protected Client initialValue() {
Client client;
client = ClientBuilder.newClient();
client.register(ProtobufMessageBodyReader.class);
client.register(ProtobufMessageBodyWriter.class);
return client;
}
};
protected String serverAddress;
public SingleServerWorker(String serverAddress, IN payload, int maxRetry) {
super(payload, maxRetry);
this.serverAddress = serverAddress;
}
public String getServerAddress() {
return serverAddress;
}
}

View File

@ -1,15 +1,17 @@
package meerkat.bulletinboard;
import com.google.common.util.concurrent.*;
import com.google.protobuf.ByteString;
import meerkat.bulletinboard.callbacks.GetRedundancyFutureCallback;
import meerkat.bulletinboard.callbacks.PostMessageFutureCallback;
import meerkat.bulletinboard.callbacks.ReadMessagesFutureCallback;
import meerkat.bulletinboard.workers.MultiServerGetRedundancyWorker;
import meerkat.bulletinboard.workers.MultiServerPostMessageWorker;
import meerkat.bulletinboard.workers.MultiServerReadMessagesWorker;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Voting.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -22,10 +24,19 @@ import java.util.concurrent.TimeUnit;
*/
public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient implements AsyncBulletinBoardClient {
private final static int THREAD_NUM = 10;
ListeningExecutorService listeningExecutor;
// Executor service for handling jobs
private final static int JOBS_THREAD_NUM = 5;
ExecutorService executorService;
// Per-server clients
List<SingleServerBulletinBoardClient> clients;
private final static int POST_MESSAGE_RETRY_NUM = 3;
private final static int READ_MESSAGES_RETRY_NUM = 1;
private final static int GET_REDUNDANCY_RETRY_NUM = 1;
private static final int SERVER_THREADPOOL_SIZE = 5;
private static final long FAIL_DELAY = 5000;
private int minAbsoluteRedundancy;
@ -42,7 +53,16 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
minAbsoluteRedundancy = (int) (clientParams.getMinRedundancy() * clientParams.getBulletinBoardAddressCount());
listeningExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_NUM));
executorService = Executors.newFixedThreadPool(JOBS_THREAD_NUM);
clients = new ArrayList<SingleServerBulletinBoardClient>(clientParams.getBulletinBoardAddressCount());
for (String address : clientParams.getBulletinBoardAddressList()){
SingleServerBulletinBoardClient client = new SingleServerBulletinBoardClient(SERVER_THREADPOOL_SIZE, FAIL_DELAY);
client.init(BulletinBoardClientParams.newBuilder()
.addBulletinBoardAddress(address)
.build());
clients.add(client);
}
}
@ -54,28 +74,52 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
* @throws CommunicationException
*/
@Override
public MessageID postMessage(BulletinBoardMessage msg, ClientCallback<?> callback){
public MessageID postMessage(BulletinBoardMessage msg, ClientCallback<Boolean> callback){
// Create job
BulletinClientJob job = new BulletinClientJob(meerkatDBs, minAbsoluteRedundancy, BulletinClientJob.JobType.POST_MESSAGE, msg, -1);
MultiServerPostMessageWorker worker =
new MultiServerPostMessageWorker(clients, minAbsoluteRedundancy, msg, POST_MESSAGE_RETRY_NUM, callback);
// Submit job and create callback
Futures.addCallback(listeningExecutor.submit(new BulletinClientWorker(job)), new PostMessageFutureCallback(listeningExecutor, callback));
// Submit job
executorService.submit(worker);
// Calculate the correct message ID and return it
digest.reset();
digest.update(msg.getMsg());
return MessageID.newBuilder().setID(ByteString.copyFrom(digest.digest())).build();
}
@Override
public MessageID postBatch(byte[] signerId, int batchId, List<BatchData> batchDataList, int startPosition, ClientCallback<?> callback) {
return null; //TODO: Implement
public MessageID postBatch(CompleteBatch completeBatch, ClientCallback<Boolean> callback) {
return null; // TODO: write this
}
@Override
public MessageID postBatch(byte[] signerId, int batchId, List<BatchData> batchDataList, ClientCallback<?> callback) {
return null; //TODO: Implement
public void beginBatch(byte[] signerId, int batchId, List<String> tagList, ClientCallback<Boolean> callback) {
// TODO: write this
}
@Override
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList,
int startPosition, ClientCallback<Boolean> callback) {
// TODO: write this
}
@Override
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, ClientCallback<Boolean> callback) {
postBatchData(signerId, batchId, batchDataList, 0, callback); // Write batch from beginning
}
@Override
public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback<?> callback) {
// TODO: write this
}
/**
@ -88,10 +132,11 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
public void getRedundancy(MessageID id, ClientCallback<Float> callback) {
// Create job
BulletinClientJob job = new BulletinClientJob(meerkatDBs, minAbsoluteRedundancy, BulletinClientJob.JobType.GET_REDUNDANCY, id, 1);
MultiServerGetRedundancyWorker worker =
new MultiServerGetRedundancyWorker(clients, minAbsoluteRedundancy, id, GET_REDUNDANCY_RETRY_NUM, callback);
// Submit job and create callback
Futures.addCallback(listeningExecutor.submit(new BulletinClientWorker(job)), new GetRedundancyFutureCallback(listeningExecutor, callback));
// Submit job
executorService.submit(worker);
}
@ -104,11 +149,11 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
public void readMessages(MessageFilterList filterList, ClientCallback<List<BulletinBoardMessage>> callback) {
// Create job
BulletinClientJob job = new BulletinClientJob(meerkatDBs, minAbsoluteRedundancy, BulletinClientJob.JobType.READ_MESSAGES,
filterList, READ_MESSAGES_RETRY_NUM);
MultiServerReadMessagesWorker worker =
new MultiServerReadMessagesWorker(clients, minAbsoluteRedundancy, filterList, READ_MESSAGES_RETRY_NUM, callback);
// Submit job and create callback
Futures.addCallback(listeningExecutor.submit(new BulletinClientWorker(job)), new ReadMessagesFutureCallback(listeningExecutor, callback));
// Submit job
executorService.submit(worker);
}
@ -127,9 +172,9 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
super.close();
try {
listeningExecutor.shutdown();
while (! listeningExecutor.isShutdown()) {
listeningExecutor.awaitTermination(10, TimeUnit.SECONDS);
executorService.shutdown();
while (! executorService.isShutdown()) {
executorService.awaitTermination(10, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
System.err.println(e.getCause() + " " + e.getMessage());

View File

@ -1,19 +0,0 @@
package meerkat.bulletinboard.callbacks;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListeningExecutorService;
import meerkat.bulletinboard.BulletinClientJobResult;
/**
* This is a future callback used to listen to workers and run on job finish
* Depending on the type of job and the finishing status of the worker: a decision is made whether to retry or return an error
*/
public abstract class ClientFutureCallback implements FutureCallback<BulletinClientJobResult> {
protected ListeningExecutorService listeningExecutor;
ClientFutureCallback(ListeningExecutorService listeningExecutor) {
this.listeningExecutor = listeningExecutor;
}
}

View File

@ -1,38 +0,0 @@
package meerkat.bulletinboard.callbacks;
import com.google.common.util.concurrent.ListeningExecutorService;
import meerkat.bulletinboard.AsyncBulletinBoardClient.*;
import meerkat.bulletinboard.BulletinClientJobResult;
import meerkat.protobuf.BulletinBoardAPI.*;
import java.util.List;
/**
* This is a future callback used to listen to workers and run on job finish
* Depending on the type of job and the finishing status of the worker: a decision is made whether to retry or return an error
*/
public class GetRedundancyFutureCallback extends ClientFutureCallback {
private ClientCallback<Float> callback;
public GetRedundancyFutureCallback(ListeningExecutorService listeningExecutor,
ClientCallback<Float> callback) {
super(listeningExecutor);
this.callback = callback;
}
@Override
public void onSuccess(BulletinClientJobResult result) {
int absoluteRedundancy = ((IntMsg) result.getResult()).getValue();
int totalServers = result.getJob().getServerAddresses().size();
callback.handleCallback( ((float) absoluteRedundancy) / ((float) totalServers) );
}
@Override
public void onFailure(Throwable t) {
callback.handleFailure(t);
}
}

View File

@ -1,44 +0,0 @@
package meerkat.bulletinboard.callbacks;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import meerkat.bulletinboard.AsyncBulletinBoardClient.*;
import meerkat.bulletinboard.BulletinClientJob;
import meerkat.bulletinboard.BulletinClientJobResult;
import meerkat.bulletinboard.BulletinClientWorker;
/**
* This is a future callback used to listen to workers and run on job finish
* Depending on the type of job and the finishing status of the worker: a decision is made whether to retry or return an error
*/
public class PostMessageFutureCallback extends ClientFutureCallback {
private ClientCallback<?> callback;
public PostMessageFutureCallback(ListeningExecutorService listeningExecutor,
ClientCallback<?> callback) {
super(listeningExecutor);
this.callback = callback;
}
@Override
public void onSuccess(BulletinClientJobResult result) {
BulletinClientJob job = result.getJob();
job.decMaxRetry();
// If redundancy is below threshold: retry
if (job.getMinServers() > 0 && job.isRetry()) {
Futures.addCallback(listeningExecutor.submit(new BulletinClientWorker(job)), this);
}
callback.handleCallback(null);
}
@Override
public void onFailure(Throwable t) {
callback.handleFailure(t);
}
}

View File

@ -1,35 +0,0 @@
package meerkat.bulletinboard.callbacks;
import com.google.common.util.concurrent.ListeningExecutorService;
import meerkat.bulletinboard.AsyncBulletinBoardClient.*;
import meerkat.bulletinboard.BulletinClientJobResult;
import meerkat.protobuf.BulletinBoardAPI;
import java.util.List;
/**
* This is a future callback used to listen to workers and run on job finish
* Depending on the type of job and the finishing status of the worker: a decision is made whether to retry or return an error
*/
public class ReadMessagesFutureCallback extends ClientFutureCallback {
private ClientCallback<List<BulletinBoardAPI.BulletinBoardMessage>> callback;
public ReadMessagesFutureCallback(ListeningExecutorService listeningExecutor,
ClientCallback<List<BulletinBoardAPI.BulletinBoardMessage>> callback) {
super(listeningExecutor);
this.callback = callback;
}
@Override
public void onSuccess(BulletinClientJobResult result) {
callback.handleCallback(((BulletinBoardAPI.BulletinBoardMessageList) result.getResult()).getMessageList());
}
@Override
public void onFailure(Throwable t) {
callback.handleFailure(t);
}
}

View File

@ -0,0 +1,74 @@
package meerkat.bulletinboard.workers;
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
import meerkat.bulletinboard.MultiServerWorker;
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI.*;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by Arbel Deutsch Peled on 27-Dec-15.
*/
public class MultiServerGetRedundancyWorker extends MultiServerWorker<MessageID, Float> {
private AtomicInteger serversContainingMessage;
private AtomicInteger totalContactedServers;
public MultiServerGetRedundancyWorker(List<SingleServerBulletinBoardClient> clients,
int minServers, MessageID payload, int maxRetry,
ClientCallback<Float> clientCallback) {
super(clients, minServers, payload, maxRetry, clientCallback); // Shuffle clients on creation to balance load
serversContainingMessage = new AtomicInteger(0);
totalContactedServers = new AtomicInteger(0);
}
/**
* This method carries out the actual communication with the servers via HTTP Post
* It accesses the servers in a random order until one answers it
* Successful retrieval from any server terminates the method and returns the received values; The list is not changed
* @return The original job and the list of messages found in the first server that answered the query
* @throws CommunicationException
*/
public void run(){
Iterator<SingleServerBulletinBoardClient> clientIterator = getClientIterator();
// Iterate through clients
while (clientIterator.hasNext()) {
SingleServerBulletinBoardClient client = clientIterator.next();
// Send request to client
client.getRedundancy(payload,this);
}
}
@Override
public void handleCallback(Float result) {
if (result > 0.5) {
serversContainingMessage.incrementAndGet();
}
if (totalContactedServers.incrementAndGet() >= getClientNumber()){
succeed(new Float(((float) serversContainingMessage.get()) / ((float) getClientNumber()) ));
}
}
@Override
public void handleFailure(Throwable t) {
handleCallback(new Float(0.0));
}
}

View File

@ -0,0 +1,7 @@
package meerkat.bulletinboard.workers;
/**
* Created by Arbel Deutsch Peled on 27-Dec-15.
*/
public class MultiServerPostBatchWorker {
}

View File

@ -0,0 +1,72 @@
package meerkat.bulletinboard.workers;
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
import meerkat.bulletinboard.MultiServerWorker;
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI.*;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import java.util.Iterator;
import java.util.List;
/**
* Created by Arbel Deutsch Peled on 27-Dec-15.
*/
public class MultiServerPostMessageWorker extends MultiServerWorker<BulletinBoardMessage, Boolean> {
public MultiServerPostMessageWorker(List<SingleServerBulletinBoardClient> clients,
int minServers, BulletinBoardMessage payload, int maxRetry,
ClientCallback<Boolean> clientCallback) {
super(clients, minServers, payload, maxRetry, clientCallback);
}
/**
* This method carries out the actual communication with the servers via HTTP Post
* It accesses the servers one by one and tries to post the payload to each in turn
* The method will only iterate once through the server list
* Successful post to a server results in removing the server from the list
* @return The original job, but with a modified server list
* @throws CommunicationException
*/
public void run() {
WebTarget webTarget;
Response response;
int count = 0; // Used to count number of servers which contain the required message in a GET_REDUNDANCY request.
// Iterate through servers
Iterator<SingleServerBulletinBoardClient> clientIterator = getClientIterator();
while (clientIterator.hasNext()) {
// Send request to Server
SingleServerBulletinBoardClient client = clientIterator.next();
client.postMessage(payload, this);
}
}
@Override
public void handleCallback(Boolean result) {
if (result){
if (minServers.decrementAndGet() <= 0){
succeed(Boolean.TRUE);
}
}
}
@Override
public void handleFailure(Throwable t) {
if (maxFailedServers.decrementAndGet() < 0){
fail(t);
}
}
}

View File

@ -0,0 +1,65 @@
package meerkat.bulletinboard.workers;
import meerkat.bulletinboard.AsyncBulletinBoardClient.ClientCallback;
import meerkat.bulletinboard.MultiServerWorker;
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI.*;
import java.util.Iterator;
import java.util.List;
/**
* Created by Arbel Deutsch Peled on 27-Dec-15.
*/
public class MultiServerReadMessagesWorker extends MultiServerWorker<MessageFilterList,List<BulletinBoardMessage>>{
private Iterator<SingleServerBulletinBoardClient> clientIterator;
public MultiServerReadMessagesWorker(List<SingleServerBulletinBoardClient> clients,
int minServers, MessageFilterList payload, int maxRetry,
ClientCallback<List<BulletinBoardMessage>> clientCallback) {
super(clients, true, minServers, payload, maxRetry, clientCallback); // Shuffle clients on creation to balance load
clientIterator = getClientIterator();
}
/**
* This method carries out the actual communication with the servers via HTTP Post
* It accesses the servers in a random order until one answers it
* Successful retrieval from any server terminates the method and returns the received values; The list is not changed
* @return The original job and the list of messages found in the first server that answered the query
* @throws CommunicationException
*/
public void run(){
// Iterate through servers
if (clientIterator.hasNext()) {
// Send request to Server
SingleServerBulletinBoardClient client = clientIterator.next();
// Retrieve answer
client.readMessages(payload,this);
} else {
fail(new CommunicationException("Could not contact any server"));
}
}
@Override
public void handleCallback(List<BulletinBoardMessage> msg) {
succeed(msg);
}
@Override
public void handleFailure(Throwable t) {
run(); // Retry with next server
}
}

View File

@ -0,0 +1,79 @@
package meerkat.bulletinboard.workers;
import meerkat.bulletinboard.SingleServerWorker;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.rest.Constants;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH;
import static meerkat.bulletinboard.BulletinBoardConstants.READ_MESSAGES_PATH;
/**
* Created by Arbel Deutsch Peled on 27-Dec-15.
*/
public class SingleServerGetRedundancyWorker extends SingleServerWorker<MessageID, Float> {
public SingleServerGetRedundancyWorker(String serverAddress, MessageID payload, int maxRetry) {
super(serverAddress, payload, maxRetry);
}
/**
* This method carries out the actual communication with the server via HTTP Post
* It queries the server for a message with the given ID
* @return TRUE if the message exists in the server and FALSE otherwise
* @throws CommunicationException if the server does not return a valid answer
*/
public Float call() throws CommunicationException{
Client client = clientLocal.get();
WebTarget webTarget;
Response response;
MessageFilterList msgFilterList = MessageFilterList.newBuilder()
.addFilter(MessageFilter.newBuilder()
.setType(FilterType.MSG_ID)
.setId(payload.getID())
.build()
).build();
// Send request to Server
webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(READ_MESSAGES_PATH);
response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(msgFilterList, Constants.MEDIATYPE_PROTOBUF));
// Retrieve answer
try {
// If a BulletinBoardMessageList is returned: the read was successful
BulletinBoardMessageList msgList = response.readEntity(BulletinBoardMessageList.class);
if (msgList.getMessageList().size() > 0){
// Message exists in the server
return new Float(1.0);
}
else {
// Message does not exist in the server
return new Float(0.0);
}
} catch (ProcessingException | IllegalStateException e) {
// Read failed
throw new CommunicationException("Server access failed");
}
finally {
response.close();
}
}
}

View File

@ -0,0 +1,61 @@
package meerkat.bulletinboard.workers;
import meerkat.bulletinboard.SingleServerWorker;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI.BoolMsg;
import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage;
import meerkat.rest.Constants;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH;
import static meerkat.bulletinboard.BulletinBoardConstants.POST_MESSAGE_PATH;
/**
* Created by Arbel Deutsch Peled on 27-Dec-15.
* Tries to contact server once and perform a post operation
*/
public class SingleServerPostMessageWorker extends SingleServerWorker<BulletinBoardMessage, Boolean> {
public SingleServerPostMessageWorker(String serverAddress, BulletinBoardMessage payload, int maxRetry) {
super(serverAddress, payload, maxRetry);
}
/**
* This method carries out the actual communication with the server via HTTP Post
* It accesses the server and tries to post the payload to it
* Successful post to a server results
* @return TRUE if the operation is successful
* @throws CommunicationException if the operation is unseccessful
*/
public Boolean call() throws CommunicationException{
Client client = clientLocal.get();
WebTarget webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(POST_MESSAGE_PATH);;
Response response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(
Entity.entity(payload, Constants.MEDIATYPE_PROTOBUF));
try {
// If a BoolMsg entity is returned: the post was successful
response.readEntity(BoolMsg.class);
return Boolean.TRUE;
} catch (ProcessingException | IllegalStateException e) {
// Post to this server failed
throw new CommunicationException("Could not contact the server");
}
finally {
response.close();
}
}
}

View File

@ -0,0 +1,68 @@
package meerkat.bulletinboard.workers;
import meerkat.bulletinboard.SingleServerWorker;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessageList;
import meerkat.protobuf.BulletinBoardAPI.MessageFilterList;
import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage;
import meerkat.rest.Constants;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import java.util.List;
import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH;
import static meerkat.bulletinboard.BulletinBoardConstants.READ_MESSAGES_PATH;
/**
* Created by Arbel Deutsch Peled on 27-Dec-15.
*/
public class SingleServerReadMessagesWorker extends SingleServerWorker<MessageFilterList, List<BulletinBoardMessage>> {
public SingleServerReadMessagesWorker(String serverAddress, MessageFilterList payload, int maxRetry) {
super(serverAddress, payload, maxRetry);
}
/**
* This method carries out the actual communication with the server via HTTP Post
* Upon successful retrieval from the server the method returns the received values
* @return The list of messages returned by the server
* @throws CommunicationException if the server's response is invalid
*/
public List<BulletinBoardMessage> call() throws CommunicationException{
Client client = clientLocal.get();
WebTarget webTarget;
Response response;
// Send request to Server
webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(READ_MESSAGES_PATH);
response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(
Entity.entity(payload, Constants.MEDIATYPE_PROTOBUF));
// Retrieve answer
try {
// If a BulletinBoardMessageList is returned: the read was successful
return response.readEntity(BulletinBoardMessageList.class).getMessageList();
} catch (ProcessingException | IllegalStateException e) {
// Read failed
throw new CommunicationException("Could not contact the server");
}
finally {
response.close();
}
}
}

View File

@ -32,10 +32,10 @@ public class BulletinBoardClientIntegrationTest {
jobSemaphore.release();
}
private class PostCallback implements ClientCallback<Object>{
private class PostCallback implements ClientCallback<Boolean>{
@Override
public void handleCallback(Object msg) {
public void handleCallback(Boolean msg) {
System.err.println("Post operation completed");
jobSemaphore.release();
}

View File

@ -46,7 +46,7 @@ dependencies {
compile 'com.google.protobuf:protobuf-java:3.+'
// ListeningExecutor
compile 'com.google.guava:guava:11.0.+'
compile 'com.google.guava:guava:15.0'
// Crypto
compile 'org.factcenter.qilin:qilin:1.2+'

View File

@ -1,6 +1,7 @@
package meerkat.bulletinboard;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Crypto.Signature;
import java.util.List;
@ -24,23 +25,48 @@ public interface AsyncBulletinBoardClient extends BulletinBoardClient {
* @param callback is a class containing methods to handle the result of the operation
* @return a unique message ID for the message, that can be later used to retrieve the batch
*/
public MessageID postMessage(BulletinBoardMessage msg, ClientCallback<?> callback);
public MessageID postMessage(BulletinBoardMessage msg, ClientCallback<Boolean> callback);
/**
* This method allows for sending large messages as a batch to the bulletin board
* Perform an end-to-end post of a signed batch message
* @param completeBatch contains all the data of the batch including the meta-data and the signature
* @param callback is a class containing methods to handle the result of the operation
* @return a unique identifier for the batch message
*/
public MessageID postBatch(CompleteBatch completeBatch, ClientCallback<Boolean> callback);
/**
* This message informs the server about the existence of a new batch message and supplies it with the tags associated with it
* @param signerId is the canonical form for the ID of the sender of this batch
* @param batchId is a unique (per signer) ID for this batch
* @param batchDataList is the (canonically ordered) list of data comprising the batch message
* @param startPosition is the location (in the batch) of the first entry in batchDataList (optionally used to continue interrupted post operations)
* @param callback is a callback function class for handling results of the operation
* @return a unique message ID for the entire message, that can be later used to retrieve the batch
* @param tagList is a list of tags that belong to the batch message
*/
public MessageID postBatch(byte[] signerId, int batchId, List<BatchData> batchDataList, int startPosition, ClientCallback<?> callback);
public void beginBatch(byte[] signerId, int batchId, List<String> tagList, ClientCallback<Boolean> callback);
/**
* Overloading of the postBatch method in which startPosition is set to the default value 0
* This method posts batch data into an (assumed to be open) batch
* It does not close the batch
* @param signerId is the canonical form for the ID of the sender of this batch
* @param batchId is a unique (per signer) ID for this batch
* @param batchDataList is the (canonically ordered) list of data comprising the entire batch message (not just the portion to be written)
* @param startPosition is the location (in the batch) of the first entry in batchDataList
* (optionally used to continue interrupted post operations)
* @param callback is a callback function class for handling results of the operation
*/
public MessageID postBatch(byte[] signerId, int batchId, List<BatchData> batchDataList, ClientCallback<?> callback);
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList,
int startPosition, ClientCallback<Boolean> callback);
/**
* Overloading of the postBatchData method in which startPosition is set to the default value 0
*/
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, ClientCallback<Boolean> callback);
/**
* Attempts to close a batch message
* @param closeBatchMessage contains the data required to close the batch
* @param callback is a callback function class for handling results of the operation
*/
public void closeBatch(CloseBatchMessage closeBatchMessage, ClientCallback<?> callback);
/**
* Check how "safe" a given message is in an asynchronous manner