From e2f3dbe6b2f615012e3365839369cf0a1e6b3e33 Mon Sep 17 00:00:00 2001 From: "arbel.peled" Date: Thu, 2 Jun 2016 10:38:31 +0300 Subject: [PATCH] Fixed some more issues (most have to do with concurrency). Implemented close method for the SQLServer which renders it unusable until reinitialization. Added test for Synchronizer for the case when the remote server is unavailable (test passes). Still need to fix Batch digest and sign issue. --- .../LocalBulletinBoardClient.java | 7 +- .../SimpleBulletinBoardSynchronizer.java | 14 +++- .../BulletinBoardSynchronizerTest.java | 70 +++++++++++++++---- .../sqlserver/BulletinBoardSQLServer.java | 4 +- 4 files changed, 75 insertions(+), 20 deletions(-) diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java index 8e2284c..f944214 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java @@ -32,13 +32,12 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo private final DeletableBulletinBoardServer server; private final ListeningScheduledExecutorService executorService; private final BatchDigest digest; - private final int subsrciptionDelay; + private final long subsrciptionDelay; /** * Initializes an instance of the client * @param server an initialized Bulletin Board Server instance which will perform the actual processing of the requests * @param threadNum is the number of concurrent threads to allocate for the client - * @param subscriptionDelay is the required delay between subscription calls in milliseconds */ public LocalBulletinBoardClient(DeletableBulletinBoardServer server, int threadNum, int subscriptionDelay) { this.server = server; @@ -108,7 +107,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo @Override public MessageID postBatch(CompleteBatch completeBatch, FutureCallback callback) { - Futures.addCallback(executorService.schedule(new CompleteBatchPoster(completeBatch), subsrciptionDelay, TimeUnit.MILLISECONDS), callback); + Futures.addCallback(executorService.submit(new CompleteBatchPoster(completeBatch)), callback); digest.update(completeBatch); return digest.digestAsMessageID(); @@ -353,7 +352,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo filterList = filterBuilder.build(); // Reschedule job - Futures.addCallback(executorService.submit(new MessageReader(filterList)), this); + Futures.addCallback(executorService.schedule(new MessageReader(filterList), subsrciptionDelay, TimeUnit.MILLISECONDS), this); } diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java index 97b6d01..7700fd6 100644 --- a/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/SimpleBulletinBoardSynchronizer.java @@ -10,6 +10,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Created by Arbel on 13/04/2016. @@ -20,6 +21,7 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize private DeletableSubscriptionBulletinBoardClient localClient; private AsyncBulletinBoardClient remoteClient; + private AtomicBoolean running; private volatile SyncStatus syncStatus; private List> messageCountCallbacks; @@ -160,8 +162,6 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize } - localClient.deleteMessage(message.getEntryNum()); - } catch (CommunicationException e) { // This is an error with the local server // TODO: log @@ -189,6 +189,7 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize this.syncStatus = SyncStatus.STOPPED; this.SLEEP_INTERVAL = sleepInterval; this.WAIT_CAP = waitCap; + this.running = new AtomicBoolean(false); } public SimpleBulletinBoardSynchronizer() { @@ -197,6 +198,12 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize private synchronized void updateSyncStatus(SyncStatus newStatus) { + if (!running.get()) { + + newStatus = SyncStatus.STOPPED; + + } + if (newStatus != syncStatus){ syncStatus = newStatus; @@ -258,7 +265,7 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize @Override public void run() { - if (syncStatus == SyncStatus.STOPPED) { + if (running.compareAndSet(false,true)){ updateSyncStatus(SyncStatus.PENDING); SyncCallback callback = new SyncCallback(); @@ -302,6 +309,7 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize @Override public void stop() { + running.set(false); updateSyncStatus(SyncStatus.STOPPED); } diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java index 1813641..d412502 100644 --- a/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/BulletinBoardSynchronizerTest.java @@ -14,19 +14,18 @@ import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.util.BulletinBoardMessageComparator; import meerkat.util.BulletinBoardMessageGenerator; import org.junit.*; -import org.springframework.jdbc.core.RowMapper; -import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import java.io.IOException; import java.io.InputStream; import java.security.*; import java.security.cert.CertificateException; -import java.sql.ResultSet; -import java.sql.SQLException; import java.util.LinkedList; import java.util.List; import java.util.Random; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.fail; @@ -38,12 +37,13 @@ public class BulletinBoardSynchronizerTest { private static final String REMOTE_SERVER_ADDRESS = "remoteDB"; private static final String LOCAL_SERVER_ADDRESS = "localDB"; + private static int testCount; private static final int THREAD_NUM = 3; private static final int SUBSCRIPTION_INTERVAL = 1000; - private static final int SYNC_SLEEP_INTERVAL = 100; - private static final int SYNC_WAIT_CAP = 200; + private static final int SYNC_SLEEP_INTERVAL = 500; + private static final int SYNC_WAIT_CAP = 1000; private DeletableSubscriptionBulletinBoardClient localClient; private AsyncBulletinBoardClient remoteClient; @@ -105,12 +105,14 @@ public class BulletinBoardSynchronizerTest { signerIDs[0] = signers[0].getSignerID(); + testCount = 0; + } @Before public void init() throws CommunicationException { - DeletableBulletinBoardServer remoteServer = new BulletinBoardSQLServer(new H2QueryProvider(REMOTE_SERVER_ADDRESS)); + DeletableBulletinBoardServer remoteServer = new BulletinBoardSQLServer(new H2QueryProvider(REMOTE_SERVER_ADDRESS + testCount)); remoteServer.init(REMOTE_SERVER_ADDRESS); remoteClient = new LocalBulletinBoardClient( @@ -118,7 +120,7 @@ public class BulletinBoardSynchronizerTest { THREAD_NUM, SUBSCRIPTION_INTERVAL); - DeletableBulletinBoardServer localServer = new BulletinBoardSQLServer(new H2QueryProvider(LOCAL_SERVER_ADDRESS)); + DeletableBulletinBoardServer localServer = new BulletinBoardSQLServer(new H2QueryProvider(LOCAL_SERVER_ADDRESS + testCount)); localServer.init(LOCAL_SERVER_ADDRESS); localClient = new LocalBulletinBoardClient( @@ -132,14 +134,24 @@ public class BulletinBoardSynchronizerTest { semaphore = new Semaphore(0); thrown = new LinkedList<>(); + testCount++; + } private class SyncStatusCallback implements FutureCallback { + private final SyncStatus statusToWaitFor; + private AtomicBoolean stillWaiting; + + public SyncStatusCallback(SyncStatus statusToWaitFor) { + this.statusToWaitFor = statusToWaitFor; + stillWaiting = new AtomicBoolean(true); + } + @Override public void onSuccess(SyncStatus result) { - if (result == SyncStatus.SYNCHRONIZED){ + if (result == statusToWaitFor && stillWaiting.compareAndSet(true, false)){ semaphore.release(); } @@ -148,7 +160,9 @@ public class BulletinBoardSynchronizerTest { @Override public void onFailure(Throwable t) { thrown.add(t); - semaphore.release(); + if (stillWaiting.compareAndSet(true,false)) { + semaphore.release(); + } } } @@ -199,7 +213,7 @@ public class BulletinBoardSynchronizerTest { localClient.postBatch(completeBatch); - synchronizer.subscribeToSyncStatus(new SyncStatusCallback()); + synchronizer.subscribeToSyncStatus(new SyncStatusCallback(SyncStatus.SYNCHRONIZED)); int[] expectedCounts = {2,0}; synchronizer.subscribeToRemainingMessagesCount(new MessageCountCallback(expectedCounts)); @@ -207,7 +221,9 @@ public class BulletinBoardSynchronizerTest { Thread syncThread = new Thread(synchronizer); syncThread.start(); - semaphore.acquire(2); + if (!semaphore.tryAcquire(2, 4000, TimeUnit.MILLISECONDS)) { + thrown.add(new TimeoutException("Timeout occurred while waiting for synchronizer to sync.")); + } synchronizer.stop(); syncThread.join(); @@ -237,6 +253,36 @@ public class BulletinBoardSynchronizerTest { } + @Test + public void testServerError() throws SignatureException, CommunicationException, InterruptedException { + + BulletinBoardMessage msg = messageGenerator.generateRandomMessage(signers, 10, 10); + + remoteClient.close(); + + synchronizer.subscribeToSyncStatus(new SyncStatusCallback(SyncStatus.SERVER_ERROR)); + + localClient.postMessage(msg); + + Thread thread = new Thread(synchronizer); + + thread.start(); + + if (!semaphore.tryAcquire(4000, TimeUnit.MILLISECONDS)) { + thrown.add(new TimeoutException("Timeout occurred while waiting for synchronizer to sync.")); + } + + synchronizer.stop(); + thread.join(); + + if (thrown.size() > 0) { + for (Throwable t : thrown) + System.err.println(t.getMessage()); + assertThat("Exception thrown by Synchronizer: " + thrown.get(0).getMessage(), false); + } + + } + @After public void close() { diff --git a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java index 301fc8e..d331c53 100644 --- a/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java +++ b/bulletin-board-server/src/main/java/meerkat/bulletinboard/sqlserver/BulletinBoardSQLServer.java @@ -1090,6 +1090,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{ @Override - public void close() {} + public void close() { + jdbcTemplate = null; + } }