242 lines
6.7 KiB
Java
242 lines
6.7 KiB
Java
package meerkat.bulletinboard;
|
|
|
|
import com.google.common.util.concurrent.FutureCallback;
|
|
import com.google.protobuf.ByteString;
|
|
import meerkat.comm.CommunicationException;
|
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
|
import meerkat.util.BulletinBoardUtils;
|
|
|
|
import java.util.LinkedList;
|
|
import java.util.List;
|
|
import java.util.concurrent.Semaphore;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
/**
|
|
* Created by Arbel on 13/04/2016.
|
|
* Simple, straightforward implementation of the {@link BulletinBoardSynchronizer} interface
|
|
*/
|
|
public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronizer {
|
|
|
|
private DeletableSubscriptionBulletinBoardClient localClient;
|
|
private AsyncBulletinBoardClient remoteClient;
|
|
|
|
private AtomicBoolean running;
|
|
private volatile SyncStatus syncStatus;
|
|
|
|
private List<FutureCallback<Integer>> messageCountCallbacks;
|
|
private List<FutureCallback<SyncStatus>> syncStatusCallbacks;
|
|
|
|
private static final MessageFilterList EMPTY_FILTER = MessageFilterList.getDefaultInstance();
|
|
private static final int DEFAULT_SLEEP_INTERVAL = 10000; // 10 Seconds
|
|
private static final int DEFAULT_WAIT_CAP = 300000; // 5 minutes wait before deciding that the sync has failed fatally
|
|
|
|
private final int SLEEP_INTERVAL;
|
|
private final int WAIT_CAP;
|
|
|
|
private Semaphore semaphore;
|
|
|
|
private class SyncCallback implements FutureCallback<List<BulletinBoardMessage>> {
|
|
|
|
@Override
|
|
public void onSuccess(List<BulletinBoardMessage> result) {
|
|
|
|
// Notify Message Count callbacks if needed
|
|
|
|
if (syncStatus != SyncStatus.SYNCHRONIZED || result.size() > 0) {
|
|
|
|
for (FutureCallback<Integer> callback : messageCountCallbacks){
|
|
callback.onSuccess(result.size());
|
|
}
|
|
|
|
}
|
|
|
|
// Handle upload and status change
|
|
|
|
SyncStatus newStatus = SyncStatus.PENDING;
|
|
|
|
if (result.size() == 0) {
|
|
newStatus = SyncStatus.SYNCHRONIZED;
|
|
semaphore.release();
|
|
}
|
|
|
|
else{ // Upload messages
|
|
|
|
for (BulletinBoardMessage message : result){
|
|
|
|
try {
|
|
|
|
if (message.getMsg().getDataTypeCase() == UnsignedBulletinBoardMessage.DataTypeCase.MSGID) {
|
|
|
|
// This is a batch message: need to upload batch data as well as the message itself
|
|
|
|
BulletinBoardMessage completeMsg = localClient.readBatchData(message);
|
|
|
|
remoteClient.postMessage(completeMsg);
|
|
|
|
localClient.deleteMessage(completeMsg.getEntryNum());
|
|
|
|
|
|
} else {
|
|
|
|
// This is a regular message: post it
|
|
remoteClient.postMessage(message);
|
|
|
|
localClient.deleteMessage(message.getEntryNum());
|
|
|
|
}
|
|
|
|
} catch (CommunicationException e) {
|
|
// This is an error with the local server
|
|
// TODO: log
|
|
updateSyncStatus(SyncStatus.SERVER_ERROR);
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
updateSyncStatus(newStatus);
|
|
|
|
}
|
|
|
|
@Override
|
|
public void onFailure(Throwable t) {
|
|
|
|
updateSyncStatus(SyncStatus.SERVER_ERROR);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
public SimpleBulletinBoardSynchronizer(int sleepInterval, int waitCap) {
|
|
this.syncStatus = SyncStatus.STOPPED;
|
|
this.SLEEP_INTERVAL = sleepInterval;
|
|
this.WAIT_CAP = waitCap;
|
|
this.running = new AtomicBoolean(false);
|
|
}
|
|
|
|
public SimpleBulletinBoardSynchronizer() {
|
|
this(DEFAULT_SLEEP_INTERVAL, DEFAULT_WAIT_CAP);
|
|
}
|
|
|
|
private synchronized void updateSyncStatus(SyncStatus newStatus) {
|
|
|
|
if (!running.get()) {
|
|
|
|
newStatus = SyncStatus.STOPPED;
|
|
|
|
}
|
|
|
|
if (newStatus != syncStatus){
|
|
|
|
syncStatus = newStatus;
|
|
|
|
for (FutureCallback<SyncStatus> callback : syncStatusCallbacks){
|
|
if (callback != null)
|
|
callback.onSuccess(syncStatus);
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
@Override
|
|
public void init(DeletableSubscriptionBulletinBoardClient localClient, AsyncBulletinBoardClient remoteClient) {
|
|
|
|
updateSyncStatus(SyncStatus.STOPPED);
|
|
|
|
this.localClient = localClient;
|
|
this.remoteClient = remoteClient;
|
|
|
|
messageCountCallbacks = new LinkedList<>();
|
|
syncStatusCallbacks = new LinkedList<>();
|
|
|
|
semaphore = new Semaphore(0);
|
|
|
|
}
|
|
|
|
@Override
|
|
public SyncStatus getSyncStatus() {
|
|
return syncStatus;
|
|
}
|
|
|
|
@Override
|
|
public void subscribeToSyncStatus(FutureCallback<SyncStatus> callback) {
|
|
syncStatusCallbacks.add(callback);
|
|
}
|
|
|
|
@Override
|
|
public List<BulletinBoardMessage> getRemainingMessages() throws CommunicationException{
|
|
return localClient.readMessages(EMPTY_FILTER);
|
|
}
|
|
|
|
@Override
|
|
public void getRemainingMessages(FutureCallback<List<BulletinBoardMessage>> callback) {
|
|
localClient.readMessages(EMPTY_FILTER, callback);
|
|
}
|
|
|
|
@Override
|
|
public long getRemainingMessagesCount() throws CommunicationException {
|
|
return localClient.readMessages(EMPTY_FILTER).size();
|
|
}
|
|
|
|
@Override
|
|
public void subscribeToRemainingMessagesCount(FutureCallback<Integer> callback) {
|
|
messageCountCallbacks.add(callback);
|
|
}
|
|
|
|
@Override
|
|
public void run() {
|
|
|
|
if (running.compareAndSet(false,true)){
|
|
|
|
updateSyncStatus(SyncStatus.PENDING);
|
|
SyncCallback callback = new SyncCallback();
|
|
|
|
while (syncStatus != SyncStatus.STOPPED) {
|
|
|
|
do {
|
|
|
|
localClient.readMessages(EMPTY_FILTER, callback);
|
|
|
|
try {
|
|
|
|
semaphore.tryAcquire(WAIT_CAP, TimeUnit.MILLISECONDS);
|
|
//TODO: log hard error. Too much time trying to upload data.
|
|
|
|
} catch (InterruptedException ignored) {
|
|
// We expect an interruption when the upload will complete
|
|
}
|
|
|
|
} while (syncStatus == SyncStatus.PENDING);
|
|
|
|
// Database is synced. Wait for new data.
|
|
|
|
try {
|
|
semaphore.tryAcquire(SLEEP_INTERVAL, TimeUnit.MILLISECONDS);
|
|
} catch (InterruptedException e) {
|
|
//TODO: log (probably nudged)
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
@Override
|
|
public void nudge() {
|
|
semaphore.release();
|
|
}
|
|
|
|
@Override
|
|
public void stop() {
|
|
|
|
running.set(false);
|
|
updateSyncStatus(SyncStatus.STOPPED);
|
|
|
|
}
|
|
|
|
}
|