Fixed some subscription functionality of the CachedClient

Cached-Client
arbel.peled 2016-06-02 13:01:41 +03:00
parent fe209f6b5a
commit 229cbfd48f
1 changed files with 34 additions and 5 deletions

View File

@ -5,6 +5,7 @@ import com.google.protobuf.ByteString;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Voting.*;
import meerkat.util.BulletinBoardUtils;
import java.util.List;
@ -30,13 +31,13 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
private class SubscriptionStoreCallback implements FutureCallback<List<BulletinBoardMessage>> {
private final FutureCallback<?> callback;
private final FutureCallback<List<BulletinBoardMessage>> callback;
public SubscriptionStoreCallback(){
callback = null;
}
public SubscriptionStoreCallback(FutureCallback<?> callback){
public SubscriptionStoreCallback(FutureCallback<List<BulletinBoardMessage>> callback){
this.callback = callback;
}
@ -44,7 +45,30 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
public void onSuccess(List<BulletinBoardMessage> result) {
for (BulletinBoardMessage msg : result) {
try {
localClient.postMessage(msg);
if (msg.getMsg().getTagList().contains(BulletinBoardConstants.BATCH_TAG)) {
// This is a batch message: need to upload batch data as well as the message itself
ByteString signerID = msg.getSig(0).getSignerId();
int batchID = Integer.parseInt(BulletinBoardUtils.findTagWithPrefix(msg, BulletinBoardConstants.BATCH_ID_TAG_PREFIX));
BatchSpecificationMessage batchSpecificationMessage = BatchSpecificationMessage.newBuilder()
.setSignerId(signerID)
.setBatchId(batchID)
.setStartPosition(0)
.build();
CompleteBatch completeBatch = localClient.readBatch(batchSpecificationMessage);
localClient.postBatch(completeBatch);
} else {
// This is a regular message: post it
localClient.postMessage(msg);
}
} catch (CommunicationException ignored) {
// TODO: log
}
@ -346,16 +370,21 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
localClient.close();
remoteClient.close();
synchronizer.stop();
try {
syncThread.join();
} catch (InterruptedException e) {
//TODO: log interruption
}
}
@Override
public void subscribe(MessageFilterList filterList, FutureCallback<List<BulletinBoardMessage>> callback) {
subscriber.subscribe(filterList, callback);
subscriber.subscribe(filterList, new SubscriptionStoreCallback(callback));
}
@Override
public void subscribe(MessageFilterList filterList, long startEntry, FutureCallback<List<BulletinBoardMessage>> callback) {
subscriber.subscribe(filterList, startEntry, callback);
subscriber.subscribe(filterList, startEntry, new SubscriptionStoreCallback(callback));
}
}