diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java index 8e2284c..f944214 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java @@ -32,13 +32,12 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo private final DeletableBulletinBoardServer server; private final ListeningScheduledExecutorService executorService; private final BatchDigest digest; - private final int subsrciptionDelay; + private final long subsrciptionDelay; /** * Initializes an instance of the client * @param server an initialized Bulletin Board Server instance which will perform the actual processing of the requests * @param threadNum is the number of concurrent threads to allocate for the client - * @param subscriptionDelay is the required delay between subscription calls in milliseconds */ public LocalBulletinBoardClient(DeletableBulletinBoardServer server, int threadNum, int subscriptionDelay) { this.server = server; @@ -108,7 +107,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo @Override public MessageID postBatch(CompleteBatch completeBatch, FutureCallback callback) { - Futures.addCallback(executorService.schedule(new CompleteBatchPoster(completeBatch), subsrciptionDelay, TimeUnit.MILLISECONDS), callback); + Futures.addCallback(executorService.submit(new CompleteBatchPoster(completeBatch)), callback); digest.update(completeBatch); return digest.digestAsMessageID(); @@ -353,7 +352,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo filterList = filterBuilder.build(); // Reschedule job - Futures.addCallback(executorService.submit(new MessageReader(filterList)), this); + Futures.addCallback(executorService.schedule(new MessageReader(filterList), subsrciptionDelay, TimeUnit.MILLISECONDS), this); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java index 97b6d01..7700fd6 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java @@ -10,6 +10,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Created by Arbel on 13/04/2016. @@ -20,6 +21,7 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize private DeletableSubscriptionBulletinBoardClient localClient; private AsyncBulletinBoardClient remoteClient; + private AtomicBoolean running; private volatile SyncStatus syncStatus; private List> messageCountCallbacks; @@ -160,8 +162,6 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize } - localClient.deleteMessage(message.getEntryNum()); - } catch (CommunicationException e) { // This is an error with the local server // TODO: log @@ -189,6 +189,7 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize this.syncStatus = SyncStatus.STOPPED; this.SLEEP_INTERVAL = sleepInterval; this.WAIT_CAP = waitCap; + this.running = new AtomicBoolean(false); } public SimpleBulletinBoardSynchronizer() { @@ -197,6 +198,12 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize private synchronized void updateSyncStatus(SyncStatus newStatus) { + if (!running.get()) { + + newStatus = SyncStatus.STOPPED; + + } + if (newStatus != syncStatus){ syncStatus = newStatus; @@ -258,7 +265,7 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize @Override public void run() { - if (syncStatus == SyncStatus.STOPPED) { + if (running.compareAndSet(false,true)){ updateSyncStatus(SyncStatus.PENDING); SyncCallback callback = new SyncCallback(); @@ -302,6 +309,7 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize @Override public void stop() { + running.set(false); updateSyncStatus(SyncStatus.STOPPED); } diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java index 1813641..d412502 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java @@ -14,19 +14,18 @@ import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.util.BulletinBoardMessageComparator; import meerkat.util.BulletinBoardMessageGenerator; import org.junit.*; -import org.springframework.jdbc.core.RowMapper; -import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import java.io.IOException; import java.io.InputStream; import java.security.*; import java.security.cert.CertificateException; -import java.sql.ResultSet; -import java.sql.SQLException; import java.util.LinkedList; import java.util.List; import java.util.Random; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.fail; @@ -38,12 +37,13 @@ public class BulletinBoardSynchronizerTest { private static final String REMOTE_SERVER_ADDRESS = "remoteDB"; private static final String LOCAL_SERVER_ADDRESS = "localDB"; + private static int testCount; private static final int THREAD_NUM = 3; private static final int SUBSCRIPTION_INTERVAL = 1000; - private static final int SYNC_SLEEP_INTERVAL = 100; - private static final int SYNC_WAIT_CAP = 200; + private static final int SYNC_SLEEP_INTERVAL = 500; + private static final int SYNC_WAIT_CAP = 1000; private DeletableSubscriptionBulletinBoardClient localClient; private AsyncBulletinBoardClient remoteClient; @@ -105,12 +105,14 @@ public class BulletinBoardSynchronizerTest { signerIDs[0] = signers[0].getSignerID(); + testCount = 0; + } @Before public void init() throws CommunicationException { - DeletableBulletinBoardServer remoteServer = new BulletinBoardSQLServer(new H2QueryProvider(REMOTE_SERVER_ADDRESS)); + DeletableBulletinBoardServer remoteServer = new BulletinBoardSQLServer(new H2QueryProvider(REMOTE_SERVER_ADDRESS + testCount)); remoteServer.init(REMOTE_SERVER_ADDRESS); remoteClient = new LocalBulletinBoardClient( @@ -118,7 +120,7 @@ public class BulletinBoardSynchronizerTest { THREAD_NUM, SUBSCRIPTION_INTERVAL); - DeletableBulletinBoardServer localServer = new BulletinBoardSQLServer(new H2QueryProvider(LOCAL_SERVER_ADDRESS)); + DeletableBulletinBoardServer localServer = new BulletinBoardSQLServer(new H2QueryProvider(LOCAL_SERVER_ADDRESS + testCount)); localServer.init(LOCAL_SERVER_ADDRESS); localClient = new LocalBulletinBoardClient( @@ -132,14 +134,24 @@ public class BulletinBoardSynchronizerTest { semaphore = new Semaphore(0); thrown = new LinkedList<>(); + testCount++; + } private class SyncStatusCallback implements FutureCallback { + private final SyncStatus statusToWaitFor; + private AtomicBoolean stillWaiting; + + public SyncStatusCallback(SyncStatus statusToWaitFor) { + this.statusToWaitFor = statusToWaitFor; + stillWaiting = new AtomicBoolean(true); + } + @Override public void onSuccess(SyncStatus result) { - if (result == SyncStatus.SYNCHRONIZED){ + if (result == statusToWaitFor && stillWaiting.compareAndSet(true, false)){ semaphore.release(); } @@ -148,7 +160,9 @@ public class BulletinBoardSynchronizerTest { @Override public void onFailure(Throwable t) { thrown.add(t); - semaphore.release(); + if (stillWaiting.compareAndSet(true,false)) { + semaphore.release(); + } } } @@ -199,7 +213,7 @@ public class BulletinBoardSynchronizerTest { localClient.postBatch(completeBatch); - synchronizer.subscribeToSyncStatus(new SyncStatusCallback()); + synchronizer.subscribeToSyncStatus(new SyncStatusCallback(SyncStatus.SYNCHRONIZED)); int[] expectedCounts = {2,0}; synchronizer.subscribeToRemainingMessagesCount(new MessageCountCallback(expectedCounts)); @@ -207,7 +221,9 @@ public class BulletinBoardSynchronizerTest { Thread syncThread = new Thread(synchronizer); syncThread.start(); - semaphore.acquire(2); + if (!semaphore.tryAcquire(2, 4000, TimeUnit.MILLISECONDS)) { + thrown.add(new TimeoutException("Timeout occurred while waiting for synchronizer to sync.")); + } synchronizer.stop(); syncThread.join(); @@ -237,6 +253,36 @@ public class BulletinBoardSynchronizerTest { } + @Test + public void testServerError() throws SignatureException, CommunicationException, InterruptedException { + + BulletinBoardMessage msg = messageGenerator.generateRandomMessage(signers, 10, 10); + + remoteClient.close(); + + synchronizer.subscribeToSyncStatus(new SyncStatusCallback(SyncStatus.SERVER_ERROR)); + + localClient.postMessage(msg); + + Thread thread = new Thread(synchronizer); + + thread.start(); + + if (!semaphore.tryAcquire(4000, TimeUnit.MILLISECONDS)) { + thrown.add(new TimeoutException("Timeout occurred while waiting for synchronizer to sync.")); + } + + synchronizer.stop(); + thread.join(); + + if (thrown.size() > 0) { + for (Throwable t : thrown) + System.err.println(t.getMessage()); + assertThat("Exception thrown by Synchronizer: " + thrown.get(0).getMessage(), false); + } + + } + @After public void close() { diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java index 301fc8e..d331c53 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java @@ -1090,6 +1090,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{ @Override - public void close() {} + public void close() { + jdbcTemplate = null; + } }