parent
7c60e487cc
commit
e91a48b5e1
|
@ -500,7 +500,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
|
||||||
MessagePoster poster = new MessagePoster(msg);
|
MessagePoster poster = new MessagePoster(msg);
|
||||||
poster.call();
|
poster.call();
|
||||||
|
|
||||||
digest.update(msg);
|
digest.update(msg.getMsg());
|
||||||
return digest.digestAsMessageID();
|
return digest.digestAsMessageID();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,8 +26,11 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
|
||||||
private List<FutureCallback<SyncStatus>> syncStatusCallbacks;
|
private List<FutureCallback<SyncStatus>> syncStatusCallbacks;
|
||||||
|
|
||||||
private static final MessageFilterList EMPTY_FILTER = MessageFilterList.getDefaultInstance();
|
private static final MessageFilterList EMPTY_FILTER = MessageFilterList.getDefaultInstance();
|
||||||
private static final int SLEEP_INTERVAL = 10000; // 10 Seconds
|
private static final int DEFAULT_SLEEP_INTERVAL = 10000; // 10 Seconds
|
||||||
private static final int WAIT_CAP = 300000; // 5 minutes wait before deciding that the sync has failed fatally
|
private static final int DEFAULT_WAIT_CAP = 300000; // 5 minutes wait before deciding that the sync has failed fatally
|
||||||
|
|
||||||
|
private final int SLEEP_INTERVAL;
|
||||||
|
private final int WAIT_CAP;
|
||||||
|
|
||||||
private Semaphore semaphore;
|
private Semaphore semaphore;
|
||||||
|
|
||||||
|
@ -123,6 +126,7 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
|
||||||
|
|
||||||
if (result.size() == 0) {
|
if (result.size() == 0) {
|
||||||
newStatus = SyncStatus.SYNCHRONIZED;
|
newStatus = SyncStatus.SYNCHRONIZED;
|
||||||
|
semaphore.release();
|
||||||
}
|
}
|
||||||
|
|
||||||
else{ // Upload messages
|
else{ // Upload messages
|
||||||
|
@ -181,8 +185,14 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public SimpleBulletinBoardSynchronizer() {
|
public SimpleBulletinBoardSynchronizer(int sleepInterval, int waitCap) {
|
||||||
this.syncStatus = SyncStatus.STOPPED;
|
this.syncStatus = SyncStatus.STOPPED;
|
||||||
|
this.SLEEP_INTERVAL = sleepInterval;
|
||||||
|
this.WAIT_CAP = waitCap;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SimpleBulletinBoardSynchronizer() {
|
||||||
|
this(DEFAULT_SLEEP_INTERVAL, DEFAULT_WAIT_CAP);
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void updateSyncStatus(SyncStatus newStatus) {
|
private synchronized void updateSyncStatus(SyncStatus newStatus) {
|
||||||
|
@ -257,11 +267,7 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
|
||||||
|
|
||||||
do {
|
do {
|
||||||
|
|
||||||
if (syncStatus == SyncStatus.PENDING || syncStatus == SyncStatus.SERVER_ERROR) {
|
localClient.readMessages(EMPTY_FILTER, callback);
|
||||||
|
|
||||||
localClient.readMessages(EMPTY_FILTER, callback);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
@ -272,7 +278,6 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
|
||||||
// We expect an interruption when the upload will complete
|
// We expect an interruption when the upload will complete
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
} while (syncStatus == SyncStatus.PENDING);
|
} while (syncStatus == SyncStatus.PENDING);
|
||||||
|
|
||||||
// Database is synced. Wait for new data.
|
// Database is synced. Wait for new data.
|
||||||
|
|
|
@ -42,6 +42,9 @@ public class BulletinBoardSynchronizerTest {
|
||||||
private static final int THREAD_NUM = 3;
|
private static final int THREAD_NUM = 3;
|
||||||
private static final int SUBSCRIPTION_INTERVAL = 1000;
|
private static final int SUBSCRIPTION_INTERVAL = 1000;
|
||||||
|
|
||||||
|
private static final int SYNC_SLEEP_INTERVAL = 100;
|
||||||
|
private static final int SYNC_WAIT_CAP = 200;
|
||||||
|
|
||||||
private DeletableSubscriptionBulletinBoardClient localClient;
|
private DeletableSubscriptionBulletinBoardClient localClient;
|
||||||
private AsyncBulletinBoardClient remoteClient;
|
private AsyncBulletinBoardClient remoteClient;
|
||||||
|
|
||||||
|
@ -70,6 +73,7 @@ public class BulletinBoardSynchronizerTest {
|
||||||
signerIDs = new ByteString[1];
|
signerIDs = new ByteString[1];
|
||||||
|
|
||||||
signers[0] = new GenericBatchDigitalSignature(new ECDSASignature());
|
signers[0] = new GenericBatchDigitalSignature(new ECDSASignature());
|
||||||
|
signerIDs[0] = signers[0].getSignerID();
|
||||||
|
|
||||||
InputStream keyStream = BulletinBoardSynchronizerTest.class.getResourceAsStream(KEYFILE_EXAMPLE);
|
InputStream keyStream = BulletinBoardSynchronizerTest.class.getResourceAsStream(KEYFILE_EXAMPLE);
|
||||||
char[] password = KEYFILE_PASSWORD1.toCharArray();
|
char[] password = KEYFILE_PASSWORD1.toCharArray();
|
||||||
|
@ -122,7 +126,7 @@ public class BulletinBoardSynchronizerTest {
|
||||||
THREAD_NUM,
|
THREAD_NUM,
|
||||||
SUBSCRIPTION_INTERVAL);
|
SUBSCRIPTION_INTERVAL);
|
||||||
|
|
||||||
synchronizer = new SimpleBulletinBoardSynchronizer();
|
synchronizer = new SimpleBulletinBoardSynchronizer(SYNC_SLEEP_INTERVAL, SYNC_WAIT_CAP);
|
||||||
synchronizer.init(localClient, remoteClient);
|
synchronizer.init(localClient, remoteClient);
|
||||||
|
|
||||||
semaphore = new Semaphore(0);
|
semaphore = new Semaphore(0);
|
||||||
|
@ -200,15 +204,19 @@ public class BulletinBoardSynchronizerTest {
|
||||||
int[] expectedCounts = {2,0};
|
int[] expectedCounts = {2,0};
|
||||||
synchronizer.subscribeToRemainingMessagesCount(new MessageCountCallback(expectedCounts));
|
synchronizer.subscribeToRemainingMessagesCount(new MessageCountCallback(expectedCounts));
|
||||||
|
|
||||||
Thread t = new Thread(synchronizer);
|
Thread syncThread = new Thread(synchronizer);
|
||||||
t.run();
|
syncThread.start();
|
||||||
|
|
||||||
semaphore.acquire();
|
semaphore.acquire(2);
|
||||||
|
|
||||||
synchronizer.stop();
|
synchronizer.stop();
|
||||||
t.join();
|
syncThread.join();
|
||||||
|
|
||||||
assertThat("Exception thrown by Synchronizer: " + thrown.get(0).getMessage(), thrown.size() == 0);
|
if (thrown.size() > 0) {
|
||||||
|
for (Throwable t : thrown)
|
||||||
|
System.err.println(t.getMessage());
|
||||||
|
assertThat("Exception thrown by Synchronizer: " + thrown.get(0).getMessage(), false);
|
||||||
|
}
|
||||||
|
|
||||||
List<BulletinBoardMessage> msgList = remoteClient.readMessages(MessageFilterList.newBuilder()
|
List<BulletinBoardMessage> msgList = remoteClient.readMessages(MessageFilterList.newBuilder()
|
||||||
.addFilter(MessageFilter.newBuilder()
|
.addFilter(MessageFilter.newBuilder()
|
||||||
|
|
|
@ -16,8 +16,6 @@ import java.security.cert.CertificateException;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
|
||||||
import static org.junit.Assert.assertThat;
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -187,10 +185,8 @@ public class GenericSubscriptionClientTester {
|
||||||
|
|
||||||
public void subscriptionTest() throws SignatureException, CommunicationException {
|
public void subscriptionTest() throws SignatureException, CommunicationException {
|
||||||
|
|
||||||
final int FIRST_POST_ID = 201;
|
|
||||||
final int SECOND_POST_ID = 202;
|
|
||||||
final String COMMON_TAG = "SUBSCRIPTION_TEST";
|
final String COMMON_TAG = "SUBSCRIPTION_TEST";
|
||||||
|
|
||||||
List<String> tags = new LinkedList<>();
|
List<String> tags = new LinkedList<>();
|
||||||
tags.add(COMMON_TAG);
|
tags.add(COMMON_TAG);
|
||||||
|
|
||||||
|
|
|
@ -231,7 +231,7 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider
|
||||||
BasicDataSource dataSource = new BasicDataSource();
|
BasicDataSource dataSource = new BasicDataSource();
|
||||||
|
|
||||||
dataSource.setDriverClassName("org.h2.Driver");
|
dataSource.setDriverClassName("org.h2.Driver");
|
||||||
dataSource.setUrl("jdbc:h2:~/" + dbName);
|
dataSource.setUrl("jdbc:h2:mem:" + dbName);
|
||||||
|
|
||||||
return dataSource;
|
return dataSource;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue