Implemented Synchronizer and Cached Client

Not tested yet
Cached-Client
arbel.peled 2016-04-16 19:50:09 +03:00
parent 9ed728fca7
commit 4c33e923b2
7 changed files with 177 additions and 51 deletions

View File

@ -14,7 +14,8 @@ import java.util.List;
* It provides asynchronous access to several remote servers, as well as a local cache
* Read operations are performed on the local server
* Batch reads are performed on the local server and, if they fail, also on the remote servers
* Write operations are performed first on the local server and then on the remotes
* Write operations are performed on the local server
* A Synchronizer is employed in order to keep the remote server up to date
* After any read is carried out, a subscription is made for the specific query to make sure the local DB will be updated
* The database also employs a synchronizer which makes sure local data is sent to the remote servers
*/
@ -23,6 +24,9 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
private final AsyncBulletinBoardClient localClient;
private AsyncBulletinBoardClient remoteClient;
private BulletinBoardSubscriber subscriber;
private BulletinBoardSynchronizer synchronizer;
private Thread syncThread;
private class SubscriptionStoreCallback implements FutureCallback<List<BulletinBoardMessage>> {
@ -62,15 +66,21 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
* @param localClient is a Client for the local instance
* @param remoteClient is a Client for the remote instance(s); Should have endless retries for post operations
* @param subscriber is a subscription service to the remote instance(s)
* @param queue is a client for a local deletable server to be used as a queue for not-yet-uploaded messages
*/
public CachedBulletinBoardClient(AsyncBulletinBoardClient localClient,
AsyncBulletinBoardClient remoteClient,
BulletinBoardSubscriber subscriber) {
BulletinBoardSubscriber subscriber,
DeletableSubscriptionBulletinBoardClient queue) {
this.localClient = localClient;
this.remoteClient = remoteClient;
this.subscriber = subscriber;
this.synchronizer = new SimpleBulletinBoardSynchronizer();
synchronizer.init(queue, remoteClient);
syncThread = new Thread(synchronizer);
syncThread.start();
}
@Override
@ -302,8 +312,12 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
@Override
public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException {
localClient.postMessage(msg);
return remoteClient.postMessage(msg);
return localClient.postMessage(msg);
}
@Override
public MessageID postBatch(CompleteBatch completeBatch) throws CommunicationException {
return localClient.postBatch(completeBatch);
}
@Override
@ -331,6 +345,7 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
public void close() {
localClient.close();
remoteClient.close();
synchronizer.stop();
}
@Override

View File

@ -57,7 +57,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
@Override
public Boolean call() throws Exception {
public Boolean call() throws CommunicationException {
return server.postMessage(msg).getValue();
}
@ -83,25 +83,21 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
@Override
public Boolean call() throws Exception {
public Boolean call() throws CommunicationException {
if (!server.beginBatch(completeBatch.getBeginBatchMessage()).getValue())
return false;
server.beginBatch(completeBatch.getBeginBatchMessage());
BatchMessage.Builder builder = BatchMessage.newBuilder()
.setSignerId(completeBatch.getSignature().getSignerId())
.setBatchId(completeBatch.getBeginBatchMessage().getBatchId());
int i=0;
for (BatchData data : completeBatch.getBatchDataList()){
BatchMessage message = BatchMessage.newBuilder()
.setSignerId(completeBatch.getSignature().getSignerId())
.setBatchId(completeBatch.getBeginBatchMessage().getBatchId())
.setSerialNum(i)
.setData(data)
.build();
if (!server.postBatchMessage(message).getValue())
return false;
server.postBatchMessage(builder.setSerialNum(i).setData(data).build());
i++;
}
return server.closeBatchMessage(completeBatch.getCloseBatchMessage()).getValue();
@ -501,17 +497,22 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
@Override
public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException {
try {
MessagePoster poster = new MessagePoster(msg);
poster.call();
digest.update(msg);
return digest.digestAsMessageID();
} catch (Exception e) {
return null;
}
}
@Override
public MessageID postBatch(CompleteBatch completeBatch) throws CommunicationException {
CompleteBatchPoster poster = new CompleteBatchPoster(completeBatch);
poster.call();
digest.update(completeBatch);
return digest.digestAsMessageID();
}
@ -609,6 +610,16 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
}
@Override
public boolean deleteMessage(MessageID msgID) throws CommunicationException {
return server.deleteMessage(msgID).getValue();
}
@Override
public boolean deleteMessage(long entryNum) throws CommunicationException {
return server.deleteMessage(entryNum).getValue();
}
@Override
public void close() {
try {

View File

@ -2,8 +2,7 @@ package meerkat.bulletinboard;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import meerkat.bulletinboard.workers.singleserver.SingleServerReadBatchWorker;
import meerkat.bulletinboard.workers.singleserver.SingleServerReadMessagesWorker;
import meerkat.bulletinboard.workers.singleserver.*;
import meerkat.comm.CommunicationException;
import meerkat.comm.MessageInputStream;
import meerkat.crypto.Digest;
@ -38,7 +37,7 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{
protected Client client;
protected Digest digest;
protected BatchDigest digest;
/**
* Stores database locations and initializes the web Client
@ -53,7 +52,8 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{
client.register(ProtobufMessageBodyReader.class);
client.register(ProtobufMessageBodyWriter.class);
digest = new SHA256Digest();
// Wrap the Digest into a BatchDigest
digest = new GenericBatchDigest(new SHA256Digest());
}
@ -92,6 +92,50 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{
return MessageID.newBuilder().setID(ByteString.copyFrom(digest.digest())).build();
}
@Override
public MessageID postBatch(CompleteBatch completeBatch) throws CommunicationException {
int pos = 0;
ByteString signerID = completeBatch.getSignature().getSignerId();
int batchID = completeBatch.getBeginBatchMessage().getBatchId();
// Post message to all databases
try {
for (String db : meerkatDBs) {
SingleServerBeginBatchWorker beginBatchWorker = new SingleServerBeginBatchWorker(db, completeBatch.getBeginBatchMessage(), 0);
beginBatchWorker.call();
BatchMessage.Builder builder = BatchMessage.newBuilder().setSignerId(signerID).setBatchId(batchID);
for (BatchData batchData : completeBatch.getBatchDataList()) {
SingleServerPostBatchWorker postBatchWorker =
new SingleServerPostBatchWorker(
db,
builder.setData(batchData).setSerialNum(pos).build(),
0);
postBatchWorker.call();
pos++;
}
SingleServerCloseBatchWorker closeBatchWorker = new SingleServerCloseBatchWorker(db, completeBatch.getCloseBatchMessage(), 0);
closeBatchWorker.call();
}
} catch (Exception e) { // Occurs only when server replies with valid status but invalid data
throw new CommunicationException("Error accessing database: " + e.getMessage());
}
digest.update(completeBatch);
return digest.digestAsMessageID();
}
/**
* Access each database and search for a given message ID
* Return the number of databases in which the message was found

View File

@ -26,6 +26,27 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
private static final MessageFilterList EMPTY_FILTER = MessageFilterList.getDefaultInstance();
private static final int SLEEP_INTERVAL = 10000; // 10 Seconds
private class MessageDeleteCallback implements FutureCallback<Boolean> {
private final long entryNum;
public MessageDeleteCallback(long entryNum) {
this.entryNum = entryNum;
}
@Override
public void onSuccess(Boolean result) {
// Success: delete from database
localClient.deleteMessage(entryNum, null);
}
@Override
public void onFailure(Throwable t) {
// Ignore
}
}
private class SyncCallback implements FutureCallback<List<BulletinBoardMessage>> {
@Override
@ -41,27 +62,40 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
for (BulletinBoardMessage message : result){
if (message.getMsg().getTagList().contains(BulletinBoardConstants.BATCH_TAG)){
try {
// This is a batch message: need to upload batch data as well as the message itself
ByteString signerId = message.getSig(0).getSignerId();
long batchID = Long.parseLong(BulletinBoardUtils.findTagWithPrefix(message, BulletinBoardConstants.BATCH_ID_TAG_PREFIX));
if (message.getMsg().getTagList().contains(BulletinBoardConstants.BATCH_TAG)) {
BatchSpecificationMessage batchSpecificationMessage = BatchSpecificationMessage.newBuilder().build();
// This is a batch message: need to upload batch data as well as the message itself
ByteString signerID = message.getSig(0).getSignerId();
int batchID = Integer.parseInt(BulletinBoardUtils.findTagWithPrefix(message, BulletinBoardConstants.BATCH_ID_TAG_PREFIX));
localClient.readBatch(batchSpecificationMessage, null);
BatchSpecificationMessage batchSpecificationMessage = BatchSpecificationMessage.newBuilder()
.setSignerId(signerID)
.setBatchId(batchID)
.setStartPosition(0)
.build();
}
else{
// This is a regular message: post it
try {
CompleteBatch completeBatch = localClient.readBatch(batchSpecificationMessage);
remoteClient.postBatch(completeBatch);
} else {
// This is a regular message: post it
remoteClient.postMessage(message);
} catch (CommunicationException e) {
newStatus = SyncStatus.SERVER_ERROR;
}
localClient.deleteMessage(message.getEntryNum());
} catch (CommunicationException e) {
updateSyncStatus(SyncStatus.SERVER_ERROR);
}
}
}
@ -144,7 +178,7 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
@Override
public void run() {
if (syncStatus != SyncStatus.STOPPED) {
if (syncStatus == SyncStatus.STOPPED) {
updateSyncStatus(SyncStatus.PENDING);
SyncCallback callback = new SyncCallback();

View File

@ -34,8 +34,6 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
private ListeningScheduledExecutorService executorService;
protected BatchDigest batchDigest;
private long lastServerErrorTime;
private final long failDelayInMilliseconds;
@ -347,9 +345,6 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
// Perform usual setup
super.init(clientParams);
// Wrap the Digest into a BatchDigest
batchDigest = new GenericBatchDigest(digest);
// Remove all but first DB address
String dbAddress = meerkatDBs.get(0);
meerkatDBs = new LinkedList<>();
@ -367,9 +362,9 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
scheduleWorker(worker, new RetryCallback<>(worker, callback));
// Calculate the correct message ID and return it
batchDigest.reset();
batchDigest.update(msg.getMsg());
return batchDigest.digestAsMessageID();
digest.reset();
digest.update(msg.getMsg());
return digest.digestAsMessageID();
}
@ -435,9 +430,9 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
new BeginBatchCallback(completeBatch, callback)
);
batchDigest.update(completeBatch);
digest.update(completeBatch);
return batchDigest.digestAsMessageID();
return digest.digestAsMessageID();
}

View File

@ -27,6 +27,14 @@ public interface BulletinBoardClient {
*/
MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException;
/**
* Perform an end-to-end post of a signed batch message
* @param completeBatch contains all the data of the batch including the meta-data and the signature
* @return a unique identifier for the batch message
* @throws CommunicationException
*/
public MessageID postBatch(CompleteBatch completeBatch) throws CommunicationException;
/**
* Check how "safe" a given message is in a synchronous manner
* @param id is the unique message identifier for retrieval

View File

@ -13,18 +13,37 @@ import meerkat.protobuf.BulletinBoardAPI.*;
public interface BulletinBoardMessageDeleter {
/**
* Deletes a message from a Bulletin Board Server
* Deletes a message from a Bulletin Board Server in a possibly asynchronous manner
* Logs this action
* @param msgID is the ID of the message to delete
* @param callback handles the result of the operation
*/
public void deleteMessage(MessageID msgID, FutureCallback<Boolean> callback);
/**
* Deletes a message from the Bulletin Board
* Deletes a message from the Bulletin Board in a possibly asynchronous manner
* Logs this action
* @param entryNum is the serial entry number of the message to delete
* @param callback handles the result of the operation
*/
public void deleteMessage(long entryNum, FutureCallback<Boolean> callback);
/**
* Deletes a message from a Bulletin Board Server in a synchronous manner
* Logs this action
* @param msgID is the ID of the message to delete
* @return TRUE if the message was deleted and FALSE if it did not exist on the server
* @throws CommunicationException when an error occurs
*/
public boolean deleteMessage(MessageID msgID) throws CommunicationException;
/**
* Deletes a message from the Bulletin Board in a synchronous manner
* Logs this action
* @param entryNum is the serial entry number of the message to delete
* @return TRUE if the message was deleted and FALSE if it did not exist on the server
* @throws CommunicationException when an error occurs
*/
public boolean deleteMessage(long entryNum) throws CommunicationException;
}