diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java index ac61114..15beec7 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java @@ -5,6 +5,7 @@ import com.google.protobuf.ByteString; import meerkat.comm.CommunicationException; import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.Voting.*; +import meerkat.util.BulletinBoardUtils; import java.util.List; @@ -30,13 +31,13 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien private class SubscriptionStoreCallback implements FutureCallback> { - private final FutureCallback callback; + private final FutureCallback> callback; public SubscriptionStoreCallback(){ callback = null; } - public SubscriptionStoreCallback(FutureCallback callback){ + public SubscriptionStoreCallback(FutureCallback> callback){ this.callback = callback; } @@ -44,7 +45,30 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien public void onSuccess(List result) { for (BulletinBoardMessage msg : result) { try { - localClient.postMessage(msg); + + if (msg.getMsg().getTagList().contains(BulletinBoardConstants.BATCH_TAG)) { + + // This is a batch message: need to upload batch data as well as the message itself + ByteString signerID = msg.getSig(0).getSignerId(); + int batchID = Integer.parseInt(BulletinBoardUtils.findTagWithPrefix(msg, BulletinBoardConstants.BATCH_ID_TAG_PREFIX)); + + BatchSpecificationMessage batchSpecificationMessage = BatchSpecificationMessage.newBuilder() + .setSignerId(signerID) + .setBatchId(batchID) + .setStartPosition(0) + .build(); + + CompleteBatch completeBatch = localClient.readBatch(batchSpecificationMessage); + + localClient.postBatch(completeBatch); + + } else { + + // This is a regular message: post it + localClient.postMessage(msg); + + } + } catch (CommunicationException ignored) { // TODO: log } @@ -346,16 +370,21 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien localClient.close(); remoteClient.close(); synchronizer.stop(); + try { + syncThread.join(); + } catch (InterruptedException e) { + //TODO: log interruption + } } @Override public void subscribe(MessageFilterList filterList, FutureCallback> callback) { - subscriber.subscribe(filterList, callback); + subscriber.subscribe(filterList, new SubscriptionStoreCallback(callback)); } @Override public void subscribe(MessageFilterList filterList, long startEntry, FutureCallback> callback) { - subscriber.subscribe(filterList, startEntry, callback); + subscriber.subscribe(filterList, startEntry, new SubscriptionStoreCallback(callback)); } } \ No newline at end of file