Fixed some more issues (most have to do with concurrency).

Implemented close method for the SQLServer which renders it unusable until reinitialization.
Added test for Synchronizer for the case when the remote server is unavailable (test passes).
Still need to fix Batch digest and sign issue.
Cached-Client
arbel.peled 2016-06-02 10:38:31 +03:00
parent e91a48b5e1
commit e2f3dbe6b2
4 changed files with 75 additions and 20 deletions

View File

@ -32,13 +32,12 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
private final DeletableBulletinBoardServer server;
private final ListeningScheduledExecutorService executorService;
private final BatchDigest digest;
private final int subsrciptionDelay;
private final long subsrciptionDelay;
/**
* Initializes an instance of the client
* @param server an initialized Bulletin Board Server instance which will perform the actual processing of the requests
* @param threadNum is the number of concurrent threads to allocate for the client
* @param subscriptionDelay is the required delay between subscription calls in milliseconds
*/
public LocalBulletinBoardClient(DeletableBulletinBoardServer server, int threadNum, int subscriptionDelay) {
this.server = server;
@ -108,7 +107,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
@Override
public MessageID postBatch(CompleteBatch completeBatch, FutureCallback<Boolean> callback) {
Futures.addCallback(executorService.schedule(new CompleteBatchPoster(completeBatch), subsrciptionDelay, TimeUnit.MILLISECONDS), callback);
Futures.addCallback(executorService.submit(new CompleteBatchPoster(completeBatch)), callback);
digest.update(completeBatch);
return digest.digestAsMessageID();
@ -353,7 +352,7 @@ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBo
filterList = filterBuilder.build();
// Reschedule job
Futures.addCallback(executorService.submit(new MessageReader(filterList)), this);
Futures.addCallback(executorService.schedule(new MessageReader(filterList), subsrciptionDelay, TimeUnit.MILLISECONDS), this);
}

View File

@ -10,6 +10,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Created by Arbel on 13/04/2016.
@ -20,6 +21,7 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
private DeletableSubscriptionBulletinBoardClient localClient;
private AsyncBulletinBoardClient remoteClient;
private AtomicBoolean running;
private volatile SyncStatus syncStatus;
private List<FutureCallback<Integer>> messageCountCallbacks;
@ -160,8 +162,6 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
}
localClient.deleteMessage(message.getEntryNum());
} catch (CommunicationException e) {
// This is an error with the local server
// TODO: log
@ -189,6 +189,7 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
this.syncStatus = SyncStatus.STOPPED;
this.SLEEP_INTERVAL = sleepInterval;
this.WAIT_CAP = waitCap;
this.running = new AtomicBoolean(false);
}
public SimpleBulletinBoardSynchronizer() {
@ -197,6 +198,12 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
private synchronized void updateSyncStatus(SyncStatus newStatus) {
if (!running.get()) {
newStatus = SyncStatus.STOPPED;
}
if (newStatus != syncStatus){
syncStatus = newStatus;
@ -258,7 +265,7 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
@Override
public void run() {
if (syncStatus == SyncStatus.STOPPED) {
if (running.compareAndSet(false,true)){
updateSyncStatus(SyncStatus.PENDING);
SyncCallback callback = new SyncCallback();
@ -302,6 +309,7 @@ public class SimpleBulletinBoardSynchronizer implements BulletinBoardSynchronize
@Override
public void stop() {
running.set(false);
updateSyncStatus(SyncStatus.STOPPED);
}

View File

@ -14,19 +14,18 @@ import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.util.BulletinBoardMessageComparator;
import meerkat.util.BulletinBoardMessageGenerator;
import org.junit.*;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import java.io.IOException;
import java.io.InputStream;
import java.security.*;
import java.security.cert.CertificateException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
@ -38,12 +37,13 @@ public class BulletinBoardSynchronizerTest {
private static final String REMOTE_SERVER_ADDRESS = "remoteDB";
private static final String LOCAL_SERVER_ADDRESS = "localDB";
private static int testCount;
private static final int THREAD_NUM = 3;
private static final int SUBSCRIPTION_INTERVAL = 1000;
private static final int SYNC_SLEEP_INTERVAL = 100;
private static final int SYNC_WAIT_CAP = 200;
private static final int SYNC_SLEEP_INTERVAL = 500;
private static final int SYNC_WAIT_CAP = 1000;
private DeletableSubscriptionBulletinBoardClient localClient;
private AsyncBulletinBoardClient remoteClient;
@ -105,12 +105,14 @@ public class BulletinBoardSynchronizerTest {
signerIDs[0] = signers[0].getSignerID();
testCount = 0;
}
@Before
public void init() throws CommunicationException {
DeletableBulletinBoardServer remoteServer = new BulletinBoardSQLServer(new H2QueryProvider(REMOTE_SERVER_ADDRESS));
DeletableBulletinBoardServer remoteServer = new BulletinBoardSQLServer(new H2QueryProvider(REMOTE_SERVER_ADDRESS + testCount));
remoteServer.init(REMOTE_SERVER_ADDRESS);
remoteClient = new LocalBulletinBoardClient(
@ -118,7 +120,7 @@ public class BulletinBoardSynchronizerTest {
THREAD_NUM,
SUBSCRIPTION_INTERVAL);
DeletableBulletinBoardServer localServer = new BulletinBoardSQLServer(new H2QueryProvider(LOCAL_SERVER_ADDRESS));
DeletableBulletinBoardServer localServer = new BulletinBoardSQLServer(new H2QueryProvider(LOCAL_SERVER_ADDRESS + testCount));
localServer.init(LOCAL_SERVER_ADDRESS);
localClient = new LocalBulletinBoardClient(
@ -132,14 +134,24 @@ public class BulletinBoardSynchronizerTest {
semaphore = new Semaphore(0);
thrown = new LinkedList<>();
testCount++;
}
private class SyncStatusCallback implements FutureCallback<SyncStatus> {
private final SyncStatus statusToWaitFor;
private AtomicBoolean stillWaiting;
public SyncStatusCallback(SyncStatus statusToWaitFor) {
this.statusToWaitFor = statusToWaitFor;
stillWaiting = new AtomicBoolean(true);
}
@Override
public void onSuccess(SyncStatus result) {
if (result == SyncStatus.SYNCHRONIZED){
if (result == statusToWaitFor && stillWaiting.compareAndSet(true, false)){
semaphore.release();
}
@ -148,7 +160,9 @@ public class BulletinBoardSynchronizerTest {
@Override
public void onFailure(Throwable t) {
thrown.add(t);
semaphore.release();
if (stillWaiting.compareAndSet(true,false)) {
semaphore.release();
}
}
}
@ -199,7 +213,7 @@ public class BulletinBoardSynchronizerTest {
localClient.postBatch(completeBatch);
synchronizer.subscribeToSyncStatus(new SyncStatusCallback());
synchronizer.subscribeToSyncStatus(new SyncStatusCallback(SyncStatus.SYNCHRONIZED));
int[] expectedCounts = {2,0};
synchronizer.subscribeToRemainingMessagesCount(new MessageCountCallback(expectedCounts));
@ -207,7 +221,9 @@ public class BulletinBoardSynchronizerTest {
Thread syncThread = new Thread(synchronizer);
syncThread.start();
semaphore.acquire(2);
if (!semaphore.tryAcquire(2, 4000, TimeUnit.MILLISECONDS)) {
thrown.add(new TimeoutException("Timeout occurred while waiting for synchronizer to sync."));
}
synchronizer.stop();
syncThread.join();
@ -237,6 +253,36 @@ public class BulletinBoardSynchronizerTest {
}
@Test
public void testServerError() throws SignatureException, CommunicationException, InterruptedException {
BulletinBoardMessage msg = messageGenerator.generateRandomMessage(signers, 10, 10);
remoteClient.close();
synchronizer.subscribeToSyncStatus(new SyncStatusCallback(SyncStatus.SERVER_ERROR));
localClient.postMessage(msg);
Thread thread = new Thread(synchronizer);
thread.start();
if (!semaphore.tryAcquire(4000, TimeUnit.MILLISECONDS)) {
thrown.add(new TimeoutException("Timeout occurred while waiting for synchronizer to sync."));
}
synchronizer.stop();
thread.join();
if (thrown.size() > 0) {
for (Throwable t : thrown)
System.err.println(t.getMessage());
assertThat("Exception thrown by Synchronizer: " + thrown.get(0).getMessage(), false);
}
}
@After
public void close() {

View File

@ -1090,6 +1090,8 @@ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{
@Override
public void close() {}
public void close() {
jdbcTemplate = null;
}
}