Added message counting ability to the server (but not to the client)

Added synchronous CompleteBatch read by the client
Started implementing the synchronizer
Added support for null callbacks
Cached-Client
arbel.peled 2016-04-14 09:20:11 +03:00
parent 48b2b9efa2
commit 9ed728fca7
14 changed files with 532 additions and 94 deletions

View File

@ -84,7 +84,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
if (callback != null)
callback.onFailure(t);
}
});
@ -101,7 +102,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
if (callback != null)
callback.onFailure(t);
}
});
@ -118,7 +120,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
if (callback != null)
callback.onFailure(t);
}
});
@ -136,7 +139,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
if (callback != null)
callback.onFailure(t);
}
});
@ -153,7 +157,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
if (callback != null)
callback.onFailure(t);
}
});
@ -171,7 +176,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
if (callback != null)
callback.onFailure(t);
}
});
@ -188,7 +194,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
if (callback != null)
callback.onFailure(t);
}
});
@ -205,7 +212,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
if (callback != null)
callback.onFailure(t);
}
});
@ -233,7 +241,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
localClient.readBatch(batchSpecificationMessage, new FutureCallback<CompleteBatch>() {
@Override
public void onSuccess(CompleteBatch result) {
callback.onSuccess(result); // Read from local client was successful
if (callback != null)
callback.onSuccess(result); // Read from local client was successful
}
@Override
@ -255,7 +264,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
public void onFailure(Throwable t) {}
});
callback.onSuccess(result);
if (callback != null)
callback.onSuccess(result);
}
@ -263,7 +273,8 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
public void onFailure(Throwable t) {
// Read from remote was unsuccessful: report error
callback.onFailure(t);
if (callback != null)
callback.onFailure(t);
}
@ -301,11 +312,16 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
}
@Override
public List<BulletinBoardMessage> readMessages(MessageFilterList filterList) {
public List<BulletinBoardMessage> readMessages(MessageFilterList filterList) throws CommunicationException {
subscriber.subscribe(filterList, new SubscriptionStoreCallback());
return localClient.readMessages(filterList);
}
@Override
public CompleteBatch readBatch(BatchSpecificationMessage batchSpecificationMessage) throws CommunicationException {
return localClient.readBatch(batchSpecificationMessage);
}
@Override
public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException {
return localClient.generateSyncQuery(generateSyncQueryParams);
@ -327,12 +343,4 @@ public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClien
subscriber.subscribe(filterList, startEntry, callback);
}
public int syncStatus(){
return 0;
}
public void reSync(){
}
}

View File

@ -291,6 +291,31 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
}
private class BatchDataReader implements Callable<List<BatchData>> {
private final BatchSpecificationMessage batchSpecificationMessage;
public BatchDataReader(BatchSpecificationMessage batchSpecificationMessage) {
this.batchSpecificationMessage = batchSpecificationMessage;
}
@Override
public List<BatchData> call() throws Exception {
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
MessageOutputStream<BatchData> outputStream = new MessageOutputStream<>(byteOutputStream);
server.readBatch(batchSpecificationMessage, outputStream);
MessageInputStream<BatchData> inputStream =
MessageInputStreamFactory.createMessageInputStream(
new ByteArrayInputStream(byteOutputStream.toByteArray()),
BatchData.class);
return inputStream.asList();
}
}
@Override
public void readMessages(MessageFilterList filterList, FutureCallback<List<BulletinBoardMessage>> callback) {
Futures.addCallback(executorService.submit(new MessageReader(filterList)), callback);
@ -310,7 +335,8 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
public void onSuccess(List<BulletinBoardMessage> result) {
// Report new messages to user
callback.onSuccess(result);
if (callback != null)
callback.onSuccess(result);
MessageFilterList.Builder filterBuilder = filterList.toBuilder();
@ -339,7 +365,8 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
public void onFailure(Throwable t) {
// Notify caller about failure and terminate subscription
callback.onFailure(t);
if (callback != null)
callback.onFailure(t);
}
}
@ -503,7 +530,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
}
@Override
public List<BulletinBoardMessage> readMessages(MessageFilterList filterList) {
public List<BulletinBoardMessage> readMessages(MessageFilterList filterList) throws CommunicationException{
try {
@ -511,7 +538,40 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
return reader.call();
} catch (Exception e){
return null;
throw new CommunicationException("Error reading from server");
}
}
@Override
public CompleteBatch readBatch(BatchSpecificationMessage batchSpecificationMessage) throws CommunicationException {
MessageFilterList filterList = MessageFilterList.newBuilder()
.addFilter(MessageFilter.newBuilder()
.setType(FilterType.TAG)
.setTag(BulletinBoardConstants.BATCH_TAG)
.build())
.addFilter(MessageFilter.newBuilder()
.setType(FilterType.TAG)
.setTag(BulletinBoardConstants.BATCH_ID_TAG_PREFIX + batchSpecificationMessage.getBatchId())
.build())
.addFilter(MessageFilter.newBuilder()
.setType(FilterType.SIGNER_ID)
.setId(batchSpecificationMessage.getSignerId())
.build())
.build();
BulletinBoardMessage batchMessage = readMessages(filterList).get(0);
BatchDataReader batchDataReader = new BatchDataReader(batchSpecificationMessage);
try {
List<BatchData> batchDataList = batchDataReader.call();
return new CompleteBatch(batchMessage, batchDataList);
} catch (Exception e) {
throw new CommunicationException("Error reading batch");
}
}
@ -525,9 +585,12 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
public void deleteMessage(MessageID msgID, FutureCallback<Boolean> callback) {
try {
callback.onSuccess(server.deleteMessage(msgID).getValue());
Boolean deleted = server.deleteMessage(msgID).getValue();
if (callback != null)
callback.onSuccess(deleted);
} catch (CommunicationException e) {
callback.onFailure(e);
if (callback != null)
callback.onFailure(e);
}
}
@ -536,9 +599,12 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
public void deleteMessage(long entryNum, FutureCallback<Boolean> callback) {
try {
callback.onSuccess(server.deleteMessage(entryNum).getValue());
Boolean deleted = server.deleteMessage(entryNum).getValue();
if (callback != null)
callback.onSuccess(deleted);
} catch (CommunicationException e) {
callback.onFailure(e);
if (callback != null)
callback.onFailure(e);
}
}

View File

@ -74,7 +74,8 @@ public abstract class MultiServerWorker<IN, OUT> extends BulletinClientWorker<IN
*/
protected void succeed(OUT result){
if (returnedResult.compareAndSet(false, true)) {
futureCallback.onSuccess(result);
if (futureCallback != null)
futureCallback.onSuccess(result);
}
}
@ -85,7 +86,8 @@ public abstract class MultiServerWorker<IN, OUT> extends BulletinClientWorker<IN
*/
protected void fail(Throwable t){
if (returnedResult.compareAndSet(false, true)) {
futureCallback.onFailure(t);
if (futureCallback != null)
futureCallback.onFailure(t);
}
}

View File

@ -2,6 +2,8 @@ package meerkat.bulletinboard;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import meerkat.bulletinboard.workers.singleserver.SingleServerReadBatchWorker;
import meerkat.bulletinboard.workers.singleserver.SingleServerReadMessagesWorker;
import meerkat.comm.CommunicationException;
import meerkat.comm.MessageInputStream;
import meerkat.crypto.Digest;
@ -138,30 +140,69 @@ public class SimpleBulletinBoardClient implements BulletinBoardClient{
@Override
public List<BulletinBoardMessage> readMessages(MessageFilterList filterList) {
WebTarget webTarget;
Response response;
BulletinBoardMessageList messageList;
// Replace null filter list with blank one.
if (filterList == null){
filterList = MessageFilterList.newBuilder().build();
filterList = MessageFilterList.getDefaultInstance();
}
for (String db : meerkatDBs) {
try {
webTarget = client.target(db).path(BULLETIN_BOARD_SERVER_PATH).path(READ_MESSAGES_PATH);
response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(filterList, Constants.MEDIATYPE_PROTOBUF));
SingleServerReadMessagesWorker worker = new SingleServerReadMessagesWorker(db, filterList, 0);
messageList = response.readEntity(BulletinBoardMessageList.class);
List<BulletinBoardMessage> result = worker.call();
if (messageList != null){
return messageList.getMessageList();
}
return result;
} catch (Exception e) {}
} catch (Exception ignored) {}
}
return null;
}
@Override
public CompleteBatch readBatch(BatchSpecificationMessage batchSpecificationMessage) throws CommunicationException {
// Create job with no retries for retrieval of the Bulletin Board Message that defines the batch
MessageFilterList filterList = MessageFilterList.newBuilder()
.addFilter(MessageFilter.newBuilder()
.setType(FilterType.TAG)
.setTag(BulletinBoardConstants.BATCH_TAG)
.build())
.addFilter(MessageFilter.newBuilder()
.setType(FilterType.TAG)
.setTag(BulletinBoardConstants.BATCH_ID_TAG_PREFIX + batchSpecificationMessage.getBatchId())
.build())
.addFilter(MessageFilter.newBuilder()
.setType(FilterType.SIGNER_ID)
.setId(batchSpecificationMessage.getSignerId())
.build())
.build();
for (String db : meerkatDBs) {
try {
SingleServerReadMessagesWorker messagesWorker = new SingleServerReadMessagesWorker(db, filterList, 0);
List<BulletinBoardMessage> messages = messagesWorker.call();
if (messages == null || messages.size() < 1)
continue;
BulletinBoardMessage batchMessage = messages.get(0);
SingleServerReadBatchWorker batchWorker = new SingleServerReadBatchWorker(db, batchSpecificationMessage, 0);
List<BatchData> batchDataList = batchWorker.call();
CompleteBatch result = new CompleteBatch(batchMessage, batchDataList);
return result;
} catch (Exception ignored) {}
}
return null;

View File

@ -0,0 +1,188 @@
package meerkat.bulletinboard;
import com.google.common.util.concurrent.FutureCallback;
import com.google.protobuf.ByteString;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.util.BulletinBoardUtils;
import java.util.LinkedList;
import java.util.List;
/**
* Created by Arbel on 13/04/2016.
* Simple, straightforward implementation of the {@link BulletinBoardSynchronizer} interface
*/
public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronizer {
private DeletableSubscriptionBulletinBoardClient localClient;
private AsyncBulletinBoardClient remoteClient;
private volatile SyncStatus syncStatus;
private List<FutureCallback<Integer>> messageCountCallbacks;
private List<FutureCallback<SyncStatus>> syncStatusCallbacks;
private static final MessageFilterList EMPTY_FILTER = MessageFilterList.getDefaultInstance();
private static final int SLEEP_INTERVAL = 10000; // 10 Seconds
private class SyncCallback implements FutureCallback<List<BulletinBoardMessage>> {
@Override
public void onSuccess(List<BulletinBoardMessage> result) {
SyncStatus newStatus = SyncStatus.PENDING;
if (result.size() == 0) {
newStatus = SyncStatus.SYNCHRONIZED;
}
else{ // Upload messages
for (BulletinBoardMessage message : result){
if (message.getMsg().getTagList().contains(BulletinBoardConstants.BATCH_TAG)){
// This is a batch message: need to upload batch data as well as the message itself
ByteString signerId = message.getSig(0).getSignerId();
long batchID = Long.parseLong(BulletinBoardUtils.findTagWithPrefix(message, BulletinBoardConstants.BATCH_ID_TAG_PREFIX));
BatchSpecificationMessage batchSpecificationMessage = BatchSpecificationMessage.newBuilder().build();
localClient.readBatch(batchSpecificationMessage, null);
}
else{
// This is a regular message: post it
try {
remoteClient.postMessage(message);
} catch (CommunicationException e) {
newStatus = SyncStatus.SERVER_ERROR;
}
}
}
}
updateSyncStatus(newStatus);
}
@Override
public void onFailure(Throwable t) {
updateSyncStatus(SyncStatus.SERVER_ERROR);
}
}
public SimpleBulletinBoardSynchronizer() {
this.syncStatus = SyncStatus.STOPPED;
}
private synchronized void updateSyncStatus(SyncStatus newStatus) {
if (newStatus != syncStatus){
syncStatus = newStatus;
for (FutureCallback<SyncStatus> callback : syncStatusCallbacks){
if (callback != null)
callback.onSuccess(syncStatus);
}
}
}
@Override
public void init(DeletableSubscriptionBulletinBoardClient localClient, AsyncBulletinBoardClient remoteClient) {
updateSyncStatus(SyncStatus.STOPPED);
this.localClient = localClient;
this.remoteClient = remoteClient;
messageCountCallbacks = new LinkedList<>();
syncStatusCallbacks = new LinkedList<>();
}
@Override
public SyncStatus getSyncStatus() {
return syncStatus;
}
@Override
public void subscribeToSyncStatus(FutureCallback<SyncStatus> callback) {
syncStatusCallbacks.add(callback);
}
@Override
public List<BulletinBoardMessage> getRemainingMessages() throws CommunicationException{
return localClient.readMessages(EMPTY_FILTER);
}
@Override
public void getRemainingMessages(FutureCallback<List<BulletinBoardMessage>> callback) {
localClient.readMessages(EMPTY_FILTER, callback);
}
@Override
public long getRemainingMessagesCount() throws CommunicationException {
return localClient.readMessages(EMPTY_FILTER).size();
}
@Override
public void subscribeToRemainingMessagesCount(FutureCallback<Integer> callback) {
messageCountCallbacks.add(callback);
}
@Override
public void run() {
if (syncStatus != SyncStatus.STOPPED) {
updateSyncStatus(SyncStatus.PENDING);
SyncCallback callback = new SyncCallback();
while (syncStatus != SyncStatus.STOPPED) {
try {
do {
localClient.readMessages(EMPTY_FILTER, callback);
} while (syncStatus == SyncStatus.PENDING);
synchronized (this) {
this.wait(SLEEP_INTERVAL);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@Override
public void nudge() {
synchronized (this) {
this.notify();
}
}
@Override
public void stop() {
updateSyncStatus(SyncStatus.STOPPED);
}
}

View File

@ -97,7 +97,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
@Override
public void onSuccess(T result) {
futureCallback.onSuccess(result);
if (futureCallback != null)
futureCallback.onSuccess(result);
}
@Override
@ -115,7 +116,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
scheduleWorker(worker, this);
} else {
// No more retries: notify caller about failure
futureCallback.onFailure(t);
if (futureCallback != null)
futureCallback.onFailure(t);
}
}
@ -150,7 +152,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
}
if (batchDataRemaining.decrementAndGet() == 0){
callback.onSuccess(this.aggregatedResult.get());
if (callback != null)
callback.onSuccess(this.aggregatedResult.get());
}
}
@ -158,7 +161,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
public void onFailure(Throwable t) {
// Notify caller about failure
callback.onFailure(t);
if (callback != null)
callback.onFailure(t);
}
}
@ -195,26 +199,16 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
if (remainingQueries.decrementAndGet() == 0){
String batchIdStr = BulletinBoardUtils.findTagWithPrefix(batchMessage, BulletinBoardConstants.BATCH_ID_TAG_PREFIX);
if (batchIdStr == null){
callback.onFailure(new CommunicationException("Server returned invalid message with no Batch ID tag"));
}
BeginBatchMessage beginBatchMessage =
BeginBatchMessage.newBuilder()
.setSignerId(batchMessage.getSig(0).getSignerId())
.setBatchId(Integer.parseInt(batchIdStr))
.addAllTag(BulletinBoardUtils.removePrefixTags(batchMessage, Arrays.asList(prefixes)))
.build();
callback.onSuccess(new CompleteBatch(beginBatchMessage, batchDataList, batchMessage.getSig(0)));
if (callback != null)
callback.onSuccess(new CompleteBatch(batchMessage, batchDataList));
}
}
protected void fail(Throwable t) {
if (failed.compareAndSet(false, true)) {
callback.onFailure(t);
if (callback != null)
callback.onFailure(t);
}
}
@ -289,7 +283,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
public void onSuccess(List<BulletinBoardMessage> result) {
// Report new messages to user
callback.onSuccess(result);
if (callback != null)
callback.onSuccess(result);
// Remove last filter from list (MIN_ENTRY one)
filterBuilder.removeFilter(filterBuilder.getFilterCount() - 1);
@ -315,7 +310,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
fail();
// Notify caller about failure and terminate subscription
callback.onFailure(t);
if (callback != null)
callback.onFailure(t);
}
}
@ -397,7 +393,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
if (callback != null)
callback.onFailure(t);
}
}
@ -425,7 +422,8 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
if (callback != null)
callback.onFailure(t);
}
}

View File

@ -29,6 +29,8 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber
private AtomicBoolean isSyncInProgress;
private Semaphore rescheduleSemaphore;
private AtomicBoolean stopped;
private static final Float[] BREAKPOINTS = {0.5f, 0.75f, 0.9f, 0.95f, 0.99f, 0.999f};
public ThreadedBulletinBoardSubscriber(Collection<SubscriptionBulletinBoardClient> clients, BulletinBoardClient localClient) {
@ -44,6 +46,8 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber
isSyncInProgress = new AtomicBoolean(false);
rescheduleSemaphore = new Semaphore(1);
stopped = new AtomicBoolean(false);
}
/**
@ -131,7 +135,8 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber
//TODO: log
callback.onFailure(e); // Hard error: Cannot guarantee subscription safety
if (callback != null)
callback.onFailure(e); // Hard error: Cannot guarantee subscription safety
}
@ -217,7 +222,8 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber
public void onSuccess(List<BulletinBoardMessage> result) {
// Propagate result to caller
callback.onSuccess(result);
if (callback != null)
callback.onSuccess(result);
// Renew subscription
@ -249,7 +255,8 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber
public void onSuccess(List<BulletinBoardMessage> result) {
// Propagate result to caller
callback.onSuccess(result);
if (callback != null)
callback.onSuccess(result);
}

View File

@ -679,6 +679,30 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{
}
@Override
public IntMsg getMessageCount(MessageFilterList filterList) throws CommunicationException {
BulletinBoardMessageList.Builder resultListBuilder = BulletinBoardMessageList.newBuilder();
// SQL length is roughly 50 characters per filter + 50 for the query itself
StringBuilder sqlBuilder = new StringBuilder(50 * (filterList.getFilterCount() + 1));
// Check if Tag/Signature tables are required for filtering purposes
sqlBuilder.append(sqlQueryProvider.getSQLString(QueryType.COUNT_MESSAGES));
// Get conditions
SQLAndParameters sqlAndParameters = getSQLFromFilters(filterList);
sqlBuilder.append(sqlAndParameters.sql);
// Run query and stream the output using a MessageCallbackHandler
List<Long> count = jdbcTemplate.query(sqlBuilder.toString(), sqlAndParameters.parameters, new LongMapper());
return IntMsg.newBuilder().setValue(count.get(0).intValue()).build();
}
/**
* This method returns a string representation of the tag associated with a batch ID
@ -713,23 +737,9 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{
.build())
.build();
// SQL length is roughly 50 characters per filter + 50 for the query itself
StringBuilder sqlBuilder = new StringBuilder(50 * (filterList.getFilterCount() + 1));
int count = getMessageCount(filterList).getValue();
// Check if Tag/Signature tables are required for filtering purposes
sqlBuilder.append(sqlQueryProvider.getSQLString(QueryType.COUNT_MESSAGES));
// Get conditions
SQLAndParameters sqlAndParameters = getSQLFromFilters(filterList);
sqlBuilder.append(sqlAndParameters.sql);
// Run query and stream the output using a MessageCallbackHandler
List<Long> count = jdbcTemplate.query(sqlBuilder.toString(), sqlAndParameters.parameters, new LongMapper());
return (count.size() > 0) && (count.get(0) > 0);
return count > 0;
}

View File

@ -15,14 +15,12 @@ import meerkat.bulletinboard.sqlserver.MySQLQueryProvider;
import meerkat.bulletinboard.sqlserver.SQLiteQueryProvider;
import meerkat.comm.CommunicationException;
import meerkat.comm.MessageOutputStream;
import meerkat.protobuf.BulletinBoardAPI;
import meerkat.protobuf.BulletinBoardAPI.*;
import static meerkat.bulletinboard.BulletinBoardConstants.*;
import static meerkat.rest.Constants.*;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
/**
* An implementation of the BulletinBoardServer which functions as a WebApp
@ -99,6 +97,16 @@ public class BulletinBoardWebApp implements BulletinBoardServer, ServletContextL
bulletinBoard.readMessages(filterList, out);
}
@Path(COUNT_MESSAGES_PATH)
@POST
@Consumes(MEDIATYPE_PROTOBUF)
@Produces(MEDIATYPE_PROTOBUF)
@Override
public IntMsg getMessageCount(MessageFilterList filterList) throws CommunicationException {
init();
return bulletinBoard.getMessageCount(filterList);
}
@Path(READ_MESSAGES_PATH)
@POST

View File

@ -42,7 +42,15 @@ public interface BulletinBoardClient {
* @param filterList return only messages that match the filters (null means no filtering)
* @return the list of messages
*/
List<BulletinBoardMessage> readMessages(MessageFilterList filterList);
List<BulletinBoardMessage> readMessages(MessageFilterList filterList) throws CommunicationException;
/**
* Read a given batch message from the bulletin board
* @param batchSpecificationMessage contains the data required to specify a single batch instance
* @return the complete batch
* @throws CommunicationException
*/
CompleteBatch readBatch(BatchSpecificationMessage batchSpecificationMessage) throws CommunicationException;
/**
* Create a SyncQuery to test against that corresponds with the current server state for a specific filter list

View File

@ -10,6 +10,7 @@ public interface BulletinBoardConstants {
public static final String BULLETIN_BOARD_SERVER_PATH = "/bbserver";
public static final String GENERATE_SYNC_QUERY_PATH = "/generatesyncquery";
public static final String READ_MESSAGES_PATH = "/readmessages";
public static final String COUNT_MESSAGES_PATH = "/countmessages";
public static final String READ_BATCH_PATH = "/readbatch";
public static final String POST_MESSAGE_PATH = "/postmessage";
public static final String BEGIN_BATCH_PATH = "/beginbatch";

View File

@ -32,13 +32,21 @@ public interface BulletinBoardServer{
public BoolMsg postMessage(BulletinBoardMessage msg) throws CommunicationException;
/**
* Read all messages posted matching the given filter
* Read all posted messages matching the given filters
* @param filterList return only messages that match the filters (empty list or null means no filtering)
* @param out is an output stream into which the matching messages are written
* @throws CommunicationException on DB connection error
*/
public void readMessages(MessageFilterList filterList, MessageOutputStream<BulletinBoardMessage> out) throws CommunicationException;
/**
* Return the number of posted messages matching the given filters
* @param filterList count only messages that match the filters (empty list or null means no filtering)
* @return an IntMsg containing the number of messages that match the filter
* @throws CommunicationException on DB connection error
*/
public IntMsg getMessageCount(MessageFilterList filterList) throws CommunicationException;
/**
* Informs server about a new batch message
* @param message contains the required data about the new batch

View File

@ -1,22 +1,84 @@
package meerkat.bulletinboard;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI.*;
import com.google.common.util.concurrent.FutureCallback;
import java.util.List;
/**
* Created by Arbel Deutsch Peled on 08-Mar-16.
* This interface defines the behaviour of a bulletin board synchronizer
* This is used to make sure that data in a specific instance of a bulletin board server is duplicated to a sufficient percentage of the other servers
*/
public interface BulletinBoardSynchronizer extends Runnable{
public interface BulletinBoardSynchronizer extends Runnable {
public enum SyncStatus{
SYNCHRONIZED, // No more messages to upload
PENDING, // Synchronizer is uploading data
SERVER_ERROR, // Synchronizer encountered an error while uploading
STOPPED // Stopped/Not started by user
}
/**
*
* @param localClient is a client for the local DB instance
* @param remoteClient is a client for the remote DBs
* @param minRedundancy
* Initializes the synchronizer with the required data to function properly
* @param localClient is a client for the temporary local storage server which contains only data to be uploaded
* @param remoteClient is a client for the remote servers into which the data needs to be uploaded
*/
public void init(BulletinBoardClient localClient, AsyncBulletinBoardClient remoteClient, float minRedundancy);
public void init(DeletableSubscriptionBulletinBoardClient localClient, AsyncBulletinBoardClient remoteClient);
/**
* Returns the current server synchronization status
* @return the current synchronization status
*/
public SyncStatus getSyncStatus();
/**
* Creates a subscription to sync status changes
* @param callback is the handler for any status changes
*/
public void subscribeToSyncStatus(FutureCallback<SyncStatus> callback);
/**
* Returns the messages which have not yet been synchronized
* @return the list of messages remaining to be synchronized
*/
public List<BulletinBoardMessage> getRemainingMessages() throws CommunicationException;
/**
* Asynchronously returns the messages which have not yet been synchronized
* @param callback is the handler for the list of messages
*/
public void getRemainingMessages(FutureCallback<List<BulletinBoardMessage>> callback);
/**
* Returns the current number of unsynchronized messages
* @return the current synchronization status
*/
public long getRemainingMessagesCount() throws CommunicationException;
/**
* Creates a subscription to changes in the number of unsynchronized messages
* @param callback is the handler for any status changes
*/
public void subscribeToRemainingMessagesCount(FutureCallback<Integer> callback);
/**
* Starts the synchronization
*/
@Override
public void run();
/**
* Lets the Synchronizer know that there is new data to be uploaded
* This is used to reduce the latency between local data-writes and uploads to the remote servers
*/
public void nudge();
/**
* Stops the synchronization
*/
public void stop();
}

View File

@ -3,8 +3,9 @@ package meerkat.bulletinboard;
import com.google.protobuf.Timestamp;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Crypto.*;
import meerkat.util.BulletinBoardMessageComparator;
import meerkat.util.BulletinBoardUtils;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
@ -49,6 +50,36 @@ public class CompleteBatch {
this.timestamp = timestamp;
}
/**
* Combines the actual Bulletin board representation of a batch into a CompleteBatch object
* @param batchMessage is the BulletinBoard message that specifies the batch
* @param batchDataList is the (ordered) list of batch data
*/
public CompleteBatch(BulletinBoardMessage batchMessage, List<BatchData> batchDataList) throws IllegalArgumentException{
final String[] PREFIXES = {
BulletinBoardConstants.BATCH_ID_TAG_PREFIX,
BulletinBoardConstants.BATCH_TAG};
String batchIdStr = BulletinBoardUtils.findTagWithPrefix(batchMessage, BulletinBoardConstants.BATCH_ID_TAG_PREFIX);
if (batchIdStr == null){
throw new IllegalArgumentException("");
}
this.beginBatchMessage =
BeginBatchMessage.newBuilder()
.setSignerId(batchMessage.getSig(0).getSignerId())
.setBatchId(Integer.parseInt(batchIdStr))
.addAllTag(BulletinBoardUtils.removePrefixTags(batchMessage, Arrays.asList(PREFIXES)))
.build();
this.batchDataList = batchDataList;
this.signature = batchMessage.getSig(0);
this.timestamp = batchMessage.getMsg().getTimestamp();
}
public BeginBatchMessage getBeginBatchMessage() {
return beginBatchMessage;
}