Split interface into BulletinBoardClient and AsyncBulletinBoardClient.
Added Batch Messages Bulletin Board Client interface and associated ProtoBufs. Returned simple implementation of BulletinBoardClient. Made ThreadedBulletinBoardClient extend SimpleBulletinBoardClient. Fixed an issue in SQLite where identical Signatures could be added to the same message.Bulletin-Board-Batch
parent
79d29a05d3
commit
b17954adc2
|
@ -5,8 +5,7 @@ import meerkat.comm.CommunicationException;
|
|||
import meerkat.crypto.Digest;
|
||||
import meerkat.crypto.concrete.SHA256Digest;
|
||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||
import meerkat.protobuf.Voting;
|
||||
import meerkat.protobuf.Voting.BulletinBoardClientParams;
|
||||
import meerkat.protobuf.Voting.*;
|
||||
import meerkat.rest.*;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -19,23 +18,24 @@ import javax.ws.rs.core.Response;
|
|||
|
||||
/**
|
||||
* Created by Arbel Deutsch Peled on 05-Dec-15.
|
||||
* Implements BulletinBoardClient interface in a simple, straightforward manner
|
||||
*/
|
||||
public class SimpleBulletinBoardClient{ //implements BulletinBoardClient {
|
||||
public class SimpleBulletinBoardClient implements BulletinBoardClient{
|
||||
|
||||
private List<String> meerkatDBs;
|
||||
protected List<String> meerkatDBs;
|
||||
|
||||
private Client client;
|
||||
protected Client client;
|
||||
|
||||
private Digest digest;
|
||||
protected Digest digest;
|
||||
|
||||
/**
|
||||
* Stores database locations and initializes the web Client
|
||||
* @param clientParams contains the data needed to access the DBs
|
||||
*/
|
||||
// @Override
|
||||
public void init(Voting.BulletinBoardClientParams clientParams) {
|
||||
@Override
|
||||
public void init(BulletinBoardClientParams clientParams) {
|
||||
|
||||
meerkatDBs = clientParams.getBulletinBoardAddressList();
|
||||
this.meerkatDBs = clientParams.getBulletinBoardAddressList();
|
||||
|
||||
client = ClientBuilder.newClient();
|
||||
client.register(ProtobufMessageBodyReader.class);
|
||||
|
@ -52,7 +52,7 @@ public class SimpleBulletinBoardClient{ //implements BulletinBoardClient {
|
|||
* @return the message ID for later retrieval
|
||||
* @throws CommunicationException
|
||||
*/
|
||||
// @Override
|
||||
@Override
|
||||
public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException {
|
||||
|
||||
WebTarget webTarget;
|
||||
|
@ -88,7 +88,7 @@ public class SimpleBulletinBoardClient{ //implements BulletinBoardClient {
|
|||
* @param id is the requested message ID
|
||||
* @return the number of DBs in which retrieval was successful
|
||||
*/
|
||||
// @Override
|
||||
@Override
|
||||
public float getRedundancy(MessageID id) {
|
||||
WebTarget webTarget;
|
||||
Response response;
|
||||
|
@ -125,7 +125,7 @@ public class SimpleBulletinBoardClient{ //implements BulletinBoardClient {
|
|||
* @param filterList return only messages that match the filters (null means no filtering).
|
||||
* @return
|
||||
*/
|
||||
// @Override
|
||||
@Override
|
||||
public List<BulletinBoardMessage> readMessages(MessageFilterList filterList) {
|
||||
WebTarget webTarget;
|
||||
Response response;
|
||||
|
@ -154,8 +154,8 @@ public class SimpleBulletinBoardClient{ //implements BulletinBoardClient {
|
|||
return null;
|
||||
}
|
||||
|
||||
// @Override
|
||||
// public void registerNewMessageCallback(MessageCallback callback, MessageFilterList filterList) {
|
||||
// callback.handleNewMessage(readMessages(filterList));
|
||||
// }
|
||||
public void close() {
|
||||
client.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -6,10 +6,8 @@ import meerkat.bulletinboard.callbacks.GetRedundancyFutureCallback;
|
|||
import meerkat.bulletinboard.callbacks.PostMessageFutureCallback;
|
||||
import meerkat.bulletinboard.callbacks.ReadMessagesFutureCallback;
|
||||
import meerkat.comm.CommunicationException;
|
||||
import meerkat.crypto.Digest;
|
||||
import meerkat.crypto.concrete.SHA256Digest;
|
||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||
import meerkat.protobuf.Voting;
|
||||
import meerkat.protobuf.Voting.*;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
|
@ -17,22 +15,16 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
/**
|
||||
* Created by Arbel Deutsch Peled on 05-Dec-15.
|
||||
* Thread-based implementation of a Bulletin Board Client.
|
||||
* Thread-based implementation of a Async Bulletin Board Client.
|
||||
* Features:
|
||||
* 1. Handles tasks concurrently.
|
||||
* 2. Retries submitting
|
||||
*/
|
||||
public class ThreadedBulletinBoardClient implements BulletinBoardClient {
|
||||
public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient implements AsyncBulletinBoardClient {
|
||||
|
||||
private final static int THREAD_NUM = 10;
|
||||
ListeningExecutorService listeningExecutor;
|
||||
|
||||
private Digest digest;
|
||||
|
||||
private List<String> meerkatDBs;
|
||||
private String postSubAddress;
|
||||
private String readSubAddress;
|
||||
|
||||
private final static int READ_MESSAGES_RETRY_NUM = 1;
|
||||
|
||||
private int minAbsoluteRedundancy;
|
||||
|
@ -44,15 +36,13 @@ public class ThreadedBulletinBoardClient implements BulletinBoardClient {
|
|||
* @param clientParams contains the required information
|
||||
*/
|
||||
@Override
|
||||
public void init(Voting.BulletinBoardClientParams clientParams) {
|
||||
public void init(BulletinBoardClientParams clientParams) {
|
||||
|
||||
meerkatDBs = clientParams.getBulletinBoardAddressList();
|
||||
super.init(clientParams);
|
||||
|
||||
minAbsoluteRedundancy = (int) (clientParams.getMinRedundancy() * meerkatDBs.size());
|
||||
minAbsoluteRedundancy = (int) (clientParams.getMinRedundancy() * clientParams.getBulletinBoardAddressCount());
|
||||
|
||||
listeningExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_NUM));
|
||||
|
||||
digest = new SHA256Digest();
|
||||
listeningExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_NUM));
|
||||
|
||||
}
|
||||
|
||||
|
@ -78,13 +68,21 @@ public class ThreadedBulletinBoardClient implements BulletinBoardClient {
|
|||
return MessageID.newBuilder().setID(ByteString.copyFrom(digest.digest())).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageID postBatch(byte[] signerId, int batchId, List<BatchData> batchDataList, int startPosition, ClientCallback<?> callback) {
|
||||
return null; //TODO: Implement
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageID postBatch(byte[] signerId, int batchId, List<BatchData> batchDataList, ClientCallback<?> callback) {
|
||||
return null; //TODO: Implement
|
||||
}
|
||||
|
||||
/**
|
||||
* Access each database and search for a given message ID
|
||||
* Return the number of databases in which the message was found
|
||||
* Only try once per DB
|
||||
* Ignore communication exceptions in specific databases
|
||||
* @param id is the requested message ID
|
||||
* @return the number of DBs in which retrieval was successful
|
||||
*/
|
||||
@Override
|
||||
public void getRedundancy(MessageID id, ClientCallback<Float> callback) {
|
||||
|
@ -101,8 +99,6 @@ public class ThreadedBulletinBoardClient implements BulletinBoardClient {
|
|||
* Go through the DBs and try to retrieve messages according to the specified filter
|
||||
* If at the operation is successful for some DB: return the results and stop iterating
|
||||
* If no operation is successful: return null (NOT blank list)
|
||||
* @param filterList return only messages that match the filters (null means no filtering).
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public void readMessages(MessageFilterList filterList, ClientCallback<List<BulletinBoardMessage>> callback) {
|
||||
|
@ -116,8 +112,15 @@ public class ThreadedBulletinBoardClient implements BulletinBoardClient {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readBatch(byte[] signerId, int batchId, ClientCallback<SignedBatch> callback) {
|
||||
// TODO: Implement
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
super.close();
|
||||
|
||||
try {
|
||||
listeningExecutor.shutdown();
|
||||
while (! listeningExecutor.isShutdown()) {
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import com.google.protobuf.ByteString;
|
||||
import meerkat.bulletinboard.AsyncBulletinBoardClient;
|
||||
import meerkat.bulletinboard.BulletinBoardClient;
|
||||
import meerkat.bulletinboard.BulletinBoardClient.ClientCallback;
|
||||
import meerkat.bulletinboard.ThreadedBulletinBoardClient;
|
||||
|
@ -97,7 +98,7 @@ public class BulletinBoardClientIntegrationTest {
|
|||
}
|
||||
}
|
||||
|
||||
private BulletinBoardClient bulletinBoardClient;
|
||||
private AsyncBulletinBoardClient bulletinBoardClient;
|
||||
|
||||
private PostCallback postCallback;
|
||||
private RedundancyCallback redundancyCallback;
|
||||
|
|
|
@ -98,10 +98,9 @@ public class SQLiteQueryProvider implements BulletinBoardSQLServer.SQLQueryProvi
|
|||
+ " REFERENCES MsgTable(EntryNum), FOREIGN KEY (TagId) REFERENCES TagTable(TagId), UNIQUE (EntryNum, TagID))");
|
||||
|
||||
list.add("CREATE TABLE IF NOT EXISTS SignatureTable (EntryNum INTEGER, SignerId BLOB, Signature BLOB,"
|
||||
+ " FOREIGN KEY (EntryNum) REFERENCES MsgTable(EntryNum))");
|
||||
+ " FOREIGN KEY (EntryNum) REFERENCES MsgTable(EntryNum), UNIQUE(SignerId, EntryNum))");
|
||||
|
||||
list.add("CREATE INDEX IF NOT EXISTS SignerIndex ON SignatureTable(SignerId)");
|
||||
list.add("CREATE UNIQUE INDEX IF NOT EXISTS SignerIndex ON SignatureTable(SignerId, EntryNum)");
|
||||
|
||||
return list;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
package meerkat.bulletinboard;
|
||||
|
||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Created by Arbel Deutsch Peled on 14-Dec-15.
|
||||
*/
|
||||
public interface AsyncBulletinBoardClient extends BulletinBoardClient {
|
||||
|
||||
/**
|
||||
* Post a message to the bulletin board in an asynchronous manner
|
||||
* @param msg is the message to be posted
|
||||
* @param callback is a class containing methods to handle the result of the operation
|
||||
* @return a unique message ID for the message, that can be later used to retrieve the batch
|
||||
*/
|
||||
MessageID postMessage(BulletinBoardMessage msg, ClientCallback<?> callback);
|
||||
|
||||
/**
|
||||
* This method allows for sending large messages as a batch to the bulletin board
|
||||
* @param signerId is the canonical form for the ID of the sender of this batch
|
||||
* @param batchId is a unique (per signer) ID for this batch
|
||||
* @param batchDataList is the (canonically ordered) list of data comprising the batch message
|
||||
* @param startPosition is the location (in the batch) of the first entry in batchDataList (optionally used to continue interrupted post operations)
|
||||
* @param callback is a callback function class for handling results of the operation
|
||||
* @return a unique message ID for the entire message, that can be later used to retrieve the batch
|
||||
*/
|
||||
MessageID postBatch(byte[] signerId, int batchId, List<BatchData> batchDataList, int startPosition, ClientCallback<?> callback);
|
||||
|
||||
/**
|
||||
* Overloading of the postBatch method in which startPosition is set to the default value 0
|
||||
*/
|
||||
MessageID postBatch(byte[] signerId, int batchId, List<BatchData> batchDataList, ClientCallback<?> callback);
|
||||
|
||||
/**
|
||||
* Check how "safe" a given message is in an asynchronous manner
|
||||
* The result of the computation is a rank between 0.0 and 1.0 indicating the fraction of servers containing the message
|
||||
* @param id is the unique message identifier for retrieval
|
||||
* @param callback is a callback function class for handling results of the operation
|
||||
*/
|
||||
void getRedundancy(MessageID id, ClientCallback<Float> callback);
|
||||
|
||||
/**
|
||||
* Read all messages posted matching the given filter in an asynchronous manner
|
||||
* Note that if messages haven't been "fully posted", this might return a different
|
||||
* set of messages in different calls. However, messages that are fully posted
|
||||
* are guaranteed to be included.
|
||||
* @param filterList return only messages that match the filters (null means no filtering).
|
||||
* @param callback is a callback function class for handling results of the operation
|
||||
*/
|
||||
void readMessages(MessageFilterList filterList, ClientCallback<List<BulletinBoardMessage>> callback);
|
||||
|
||||
/**
|
||||
* Read a given batch message from the bulletin board
|
||||
* @param signerId is the ID of the signer (sender) of the batch message
|
||||
* @param batchId is the unique (per signer) ID of the batch
|
||||
* @param callback is a callback class for handling the result of the operation
|
||||
*/
|
||||
void readBatch(byte[] signerId, int batchId, ClientCallback<SignedBatch> callback);
|
||||
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
package meerkat.bulletinboard;
|
||||
|
||||
import meerkat.comm.*;
|
||||
import meerkat.comm.CommunicationException;
|
||||
import meerkat.protobuf.Voting.*;
|
||||
|
||||
import static meerkat.protobuf.BulletinBoardAPI.*;
|
||||
|
@ -24,26 +24,31 @@ public interface BulletinBoardClient {
|
|||
void init(BulletinBoardClientParams clientParams);
|
||||
|
||||
/**
|
||||
* Post a message to the bulletin board
|
||||
* @param msg
|
||||
* Post a message to the bulletin board in a synchronous manner
|
||||
* @param msg is the message to be posted
|
||||
* @return a unique message ID for the message, that can be later used to retrieve the batch
|
||||
* @throws CommunicationException
|
||||
*/
|
||||
MessageID postMessage(BulletinBoardMessage msg, ClientCallback<?> callback);
|
||||
MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Check how "safe" a given message is
|
||||
* @param id
|
||||
* Check how "safe" a given message is in a synchronous manner
|
||||
* @param id is the unique message identifier for retrieval
|
||||
* @return a normalized "redundancy score" from 0 (local only) to 1 (fully published)
|
||||
*/
|
||||
void getRedundancy(MessageID id, ClientCallback<Float> callback);
|
||||
float getRedundancy(MessageID id);
|
||||
|
||||
/**
|
||||
* Read all messages posted matching the given filter
|
||||
* Read all messages posted matching the given filter in a synchronous manner
|
||||
* Note that if messages haven't been "fully posted", this might return a different
|
||||
* set of messages in different calls. However, messages that are fully posted
|
||||
* are guaranteed to be included.
|
||||
* @param filterList return only messages that match the filters (null means no filtering).
|
||||
* @return the list of messages
|
||||
*/
|
||||
void readMessages(MessageFilterList filterList, ClientCallback<List<BulletinBoardMessage>> callback);
|
||||
List<BulletinBoardMessage> readMessages(MessageFilterList filterList);
|
||||
|
||||
/**
|
||||
* Closes all connections, if any.
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
package meerkat.bulletinboard;
|
||||
|
||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||
import meerkat.protobuf.Crypto.*;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Created by Arbel Deutsch Peled on 14-Dec-15.
|
||||
*
|
||||
* A data structure for holding both a batch message and its signature
|
||||
*/
|
||||
public class SignedBatch {
|
||||
|
||||
private List<BatchData> batchDataList;
|
||||
private Signature signature;
|
||||
|
||||
public SignedBatch() {
|
||||
batchDataList = new LinkedList<BatchData>();
|
||||
}
|
||||
|
||||
public SignedBatch(List<BatchData> newDataList) {
|
||||
this();
|
||||
appendBatchData(newDataList);
|
||||
}
|
||||
|
||||
public SignedBatch(List<BatchData> newDataList, Signature newSignature) {
|
||||
this(newDataList);
|
||||
signature = newSignature;
|
||||
}
|
||||
|
||||
public List<BatchData> getBatchDataList() {
|
||||
return batchDataList;
|
||||
}
|
||||
|
||||
public Signature getSignature() {
|
||||
return signature;
|
||||
}
|
||||
|
||||
public void appendBatchData(BatchData newBatchData) {
|
||||
batchDataList.add(newBatchData);
|
||||
}
|
||||
|
||||
public void appendBatchData(List<BatchData> newBatchDataList) {
|
||||
batchDataList.addAll(newBatchDataList);
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -77,4 +77,31 @@ message MessageFilterList {
|
|||
// To be implemented using intersection ("AND") operations.
|
||||
repeated MessageFilter filter = 1;
|
||||
|
||||
}
|
||||
|
||||
// This message is used to start a batch transfer to the Bulletin Board Server
|
||||
message BeginBatchMessage {
|
||||
bytes signerId = 1; // Unique signer identifier
|
||||
int32 batchId = 2; // Unique identifier for the batch (unique per signer)
|
||||
repeated string tag = 3; // Tags for the batch message
|
||||
}
|
||||
|
||||
// This message is used to finalize and sign a batch transfer to the Bulletin Board Server
|
||||
message CloseBatchMessage {
|
||||
int32 batchId = 1; // Unique identifier for the batch (unique per signer)
|
||||
int32 batchLength = 2; // Number of messages in the batch
|
||||
meerkat.Signature sig = 3; // Signature on the (ordered) batch messages
|
||||
}
|
||||
|
||||
// Container for single batch message data
|
||||
message BatchData {
|
||||
bytes data = 1;
|
||||
}
|
||||
|
||||
// These messages comprise a batch message
|
||||
message BatchMessage {
|
||||
bytes signerId = 1; // Unique signer identifier
|
||||
int32 batchId = 2; // Unique identifier for the batch (unique per signer)
|
||||
int32 serialNum = 3; // Location of the message in the batch: starting from 0
|
||||
BatchData data = 4; // Actual data
|
||||
}
|
Loading…
Reference in New Issue