diff --git a/bulletin-board-client/build.gradle b/bulletin-board-client/build.gradle new file mode 100644 index 0000000..8fdeba0 --- /dev/null +++ b/bulletin-board-client/build.gradle @@ -0,0 +1,242 @@ + +plugins { + id "us.kirchmeier.capsule" version "1.0.1" + id 'com.google.protobuf' version '0.7.0' +} + +apply plugin: 'java' +apply plugin: 'com.google.protobuf' +apply plugin: 'eclipse' +apply plugin: 'idea' + +apply plugin: 'maven-publish' + +// Is this a snapshot version? +ext { isSnapshot = false } + +ext { + groupId = 'org.factcenter.meerkat' + nexusRepository = "https://cs.idc.ac.il/nexus/content/groups/${isSnapshot ? 'unstable' : 'public'}/" + + // Credentials for IDC nexus repositories (needed only for using unstable repositories and publishing) + // Should be set in ${HOME}/.gradle/gradle.properties + nexusUser = project.hasProperty('nexusUser') ? project.property('nexusUser') : "" + nexusPassword = project.hasProperty('nexusPassword') ? project.property('nexusPassword') : "" +} + +description = "Meerkat Voting Common Library" + +// Your project version +version = "0.0" + +version += "${isSnapshot ? '-SNAPSHOT' : ''}" + + +dependencies { + + // Meerkat common + compile project(':meerkat-common') + compile project(':restful-api-common') + + // Jersey for RESTful API + compile 'org.glassfish.jersey.containers:jersey-container-servlet:2.22.+' + compile 'org.xerial:sqlite-jdbc:3.7.+' + + // Logging + compile 'org.slf4j:slf4j-api:1.7.7' + runtime 'ch.qos.logback:logback-classic:1.1.2' + runtime 'ch.qos.logback:logback-core:1.1.2' + + // Google protobufs + compile 'com.google.protobuf:protobuf-java:3.+' + + // Crypto + compile 'org.factcenter.qilin:qilin:1.1+' + compile 'org.bouncycastle:bcprov-jdk15on:1.53' + compile 'org.bouncycastle:bcpkix-jdk15on:1.53' + + // Depend on test resources from meerkat-common + testCompile project(path: ':meerkat-common', configuration: 'testOutput') + + // Depend on server compilation for the non-integration tests + testCompile project(path: ':bulletin-board-server') + + testCompile 'junit:junit:4.+' + testCompile 'org.hamcrest:hamcrest-all:1.3' + + runtime 'org.codehaus.groovy:groovy:2.4.+' +} + +test { + exclude '**/*IntegrationTest*' + outputs.upToDateWhen { false } +} + +task integrationTest(type: Test) { + include '**/*IntegrationTest*' +// debug = true + outputs.upToDateWhen { false } + +} + +/*==== You probably don't have to edit below this line =======*/ + + +// Setup test configuration that can appear as a dependency in +// other subprojects +configurations { + testOutput.extendsFrom (testCompile) +} + +task testJar(type: Jar, dependsOn: testClasses) { + classifier = 'tests' + from sourceSets.test.output +} + +artifacts { + testOutput testJar +} + + + +// The run task added by the application plugin +// is also of type JavaExec. +tasks.withType(JavaExec) { + // Assign all Java system properties from + // the command line to the JavaExec task. + systemProperties System.properties +} + + +protobuf { + // Configure the protoc executable + protoc { + // Download from repositories + artifact = 'com.google.protobuf:protoc:3.+' + } +} + +idea { + module { + project.sourceSets.each { sourceSet -> + + def srcDir = "${protobuf.generatedFilesBaseDir}/$sourceSet.name/java" + + // add protobuf generated sources to generated source dir. + if ("test".equals(sourceSet.name)) { + testSourceDirs += file(srcDir) + } else { + sourceDirs += file(srcDir) + } + generatedSourceDirs += file(srcDir) + + } + + // Don't exclude build directory + excludeDirs -= file(buildDir) + } +} + +/*=================================== + * "Fat" Build targets + *===================================*/ + +if (project.hasProperty('mainClassName') && (mainClassName != null)) { + + task mavenCapsule(type: MavenCapsule) { + description = "Generate a capsule jar that automatically downloads and caches dependencies when run." + applicationClass mainClassName + destinationDir = buildDir + } + + task fatCapsule(type: FatCapsule) { + description = "Generate a single capsule jar containing everything. Use -Pfatmain=... to override main class" + + destinationDir = buildDir + + def fatMain = hasProperty('fatmain') ? fatmain : mainClassName + + applicationClass fatMain + + def testJar = hasProperty('test') + + if (hasProperty('fatmain')) { + appendix = "fat-${fatMain}" + } else { + appendix = "fat" + } + + if (testJar) { + from sourceSets.test.output + } + } + +} + +/*=================================== + * Repositories + *===================================*/ + +repositories { + + mavenLocal(); + + // Prefer the local nexus repository (it may have 3rd party artifacts not found in mavenCentral) + maven { + url nexusRepository + + if (isSnapshot) { + credentials { username + password + + username nexusUser + password nexusPassword + } + } + } + + // Use 'maven central' for other dependencies. + mavenCentral() +} + +task "info" << { + println "Project: ${project.name}" + println "Description: ${project.description}" + println "--------------------------" + println "GroupId: $groupId" + println "Version: $version (${isSnapshot ? 'snapshot' : 'release'})" + println "" +} +info.description 'Print some information about project parameters' + + +/*=================================== + * Publishing + *===================================*/ + +publishing { + publications { + mavenJava(MavenPublication) { + groupId project.groupId + pom.withXml { + asNode().appendNode('description', project.description) + } + from project.components.java + + } + } + repositories { + maven { + url "https://cs.idc.ac.il/nexus/content/repositories/${project.isSnapshot ? 'snapshots' : 'releases'}" + credentials { username + password + + username nexusUser + password nexusPassword + } + } + } +} + + + diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java new file mode 100644 index 0000000..96ba76d --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/CachedBulletinBoardClient.java @@ -0,0 +1,168 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI; +import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.protobuf.Voting.*; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Executors; + +/** + * Created by Arbel Deutsch Peled on 03-Mar-16. + * This is a full-fledged implementation of a Bulletin Board Client + * It provides asynchronous access to several remote servers, as well as a local cache + * Read/write operations are performed on the local server + * After any read is carried out, a subscription is made for the specific query to make sure the local DB will be updated + * The database also employs a synchronizer which makes sure local data is sent to the remote servers + */ +public class CachedBulletinBoardClient implements SubscriptionAsyncBulletinBoardClient { + + private final BulletinBoardClient localClient; + private AsyncBulletinBoardClient remoteClient; + private BulletinBoardSubscriber subscriber; + + private final int threadPoolSize; + private final long failDelayInMilliseconds; + private final long subscriptionIntervalInMilliseconds; + + public CachedBulletinBoardClient(BulletinBoardClient localClient, + int threadPoolSize, + long failDelayInMilliseconds, + long subscriptionIntervalInMilliseconds) + throws IllegalAccessException, InstantiationException { + + this.localClient = localClient; + this.threadPoolSize = threadPoolSize; + this.failDelayInMilliseconds = failDelayInMilliseconds; + this.subscriptionIntervalInMilliseconds = subscriptionIntervalInMilliseconds; + + remoteClient = new ThreadedBulletinBoardClient(); + + } + + @Override + public MessageID postMessage(BulletinBoardMessage msg, FutureCallback callback) { + return null; + } + + @Override + public MessageID postBatch(CompleteBatch completeBatch, FutureCallback callback) { + return null; + } + + @Override + public void beginBatch(BeginBatchMessage beginBatchMessage, FutureCallback callback) { + + } + + @Override + public void postBatchData(byte[] signerId, int batchId, List batchDataList, int startPosition, FutureCallback callback) { + + } + + @Override + public void postBatchData(byte[] signerId, int batchId, List batchDataList, FutureCallback callback) { + + } + + @Override + public void postBatchData(ByteString signerId, int batchId, List batchDataList, int startPosition, FutureCallback callback) { + + } + + @Override + public void postBatchData(ByteString signerId, int batchId, List batchDataList, FutureCallback callback) { + + } + + @Override + public void closeBatch(CloseBatchMessage closeBatchMessage, FutureCallback callback) { + + } + + @Override + public void getRedundancy(MessageID id, FutureCallback callback) { + + } + + @Override + public void readMessages(MessageFilterList filterList, FutureCallback> callback) { + + } + + @Override + public void readBatch(BatchSpecificationMessage batchSpecificationMessage, FutureCallback callback) { + + } + + @Override + public void querySync(SyncQuery syncQuery, FutureCallback callback) { + + } + + @Override + public void init(BulletinBoardClientParams clientParams) { + + remoteClient.init(clientParams); + + ListeningScheduledExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadPoolSize)); + + List subscriberClients = new ArrayList<>(clientParams.getBulletinBoardAddressCount()); + + for (String address : clientParams.getBulletinBoardAddressList()){ + + SubscriptionAsyncBulletinBoardClient newClient = + new SingleServerBulletinBoardClient(executorService, failDelayInMilliseconds, subscriptionIntervalInMilliseconds); + + newClient.init(clientParams.toBuilder().clearBulletinBoardAddress().addBulletinBoardAddress(address).build()); + + subscriberClients.add(newClient); + + } + + subscriber = new ThreadedBulletinBoardSubscriber(subscriberClients, localClient); + + } + + @Override + public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException { + return null; + } + + @Override + public float getRedundancy(MessageID id) { + return 0; + } + + @Override + public List readMessages(MessageFilterList filterList) { + return null; + } + + @Override + public SyncQuery generateSyncQuery(GenerateSyncQueryParams GenerateSyncQueryParams) throws CommunicationException { + return null; + } + + @Override + public void close() { + + } + + @Override + public void subscribe(MessageFilterList filterList, FutureCallback> callback) { + + } + + @Override + public void subscribe(MessageFilterList filterList, long startEntry, FutureCallback> callback) { + + } +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java new file mode 100644 index 0000000..df3e196 --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/LocalBulletinBoardClient.java @@ -0,0 +1,531 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.*; +import com.google.protobuf.ByteString; +import meerkat.comm.CommunicationException; +import meerkat.comm.MessageInputStream; +import meerkat.comm.MessageInputStream.MessageInputStreamFactory; +import meerkat.comm.MessageOutputStream; +import meerkat.crypto.concrete.SHA256Digest; +import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.protobuf.Voting.*; +import meerkat.util.BulletinBoardUtils; + +import javax.ws.rs.NotFoundException; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Created by Arbel Deutsch Peled on 15-Mar-16. + * This client is to be used mainly for testing. + * It wraps a BulletinBoardServer in an asynchronous client. + * This means the access to the server is direct (via method calls) instead of through a TCP connection. + * The client implements both synchronous and asynchronous method calls, but calls to the server itself are performed synchronously. + */ +public class LocalBulletinBoardClient implements SubscriptionAsyncBulletinBoardClient{ + + private final BulletinBoardServer server; + private final ListeningScheduledExecutorService executorService; + private final BatchDigest digest; + private final int subsrciptionDelay; + + /** + * Initializes an instance of the client + * @param server an initialized Bulletin Board Server instance which will perform the actual processing of the requests + * @param threadNum is the number of concurrent threads to allocate for the client + * @param subscriptionDelay is the required delay between subscription calls in milliseconds + */ + public LocalBulletinBoardClient(BulletinBoardServer server, int threadNum, int subscriptionDelay) { + this.server = server; + this.executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadNum)); + this.digest = new GenericBatchDigest(new SHA256Digest()); + this.subsrciptionDelay = subscriptionDelay; + } + + private class MessagePoster implements Callable { + + private final BulletinBoardMessage msg; + + public MessagePoster(BulletinBoardMessage msg) { + this.msg = msg; + } + + + @Override + public Boolean call() throws Exception { + return server.postMessage(msg).getValue(); + } + + } + + @Override + public MessageID postMessage(BulletinBoardMessage msg, FutureCallback callback) { + + Futures.addCallback(executorService.submit(new MessagePoster(msg)), callback); + + digest.update(msg.getMsg()); + return digest.digestAsMessageID(); + + } + + private class CompleteBatchPoster implements Callable { + + private final CompleteBatch completeBatch; + + public CompleteBatchPoster(CompleteBatch completeBatch) { + this.completeBatch = completeBatch; + } + + + @Override + public Boolean call() throws Exception { + + if (!server.beginBatch(completeBatch.getBeginBatchMessage()).getValue()) + return false; + + int i=0; + for (BatchData data : completeBatch.getBatchDataList()){ + + BatchMessage message = BatchMessage.newBuilder() + .setSignerId(completeBatch.getSignature().getSignerId()) + .setBatchId(completeBatch.getBeginBatchMessage().getBatchId()) + .setSerialNum(i) + .setData(data) + .build(); + + if (!server.postBatchMessage(message).getValue()) + return false; + + i++; + } + + return server.closeBatchMessage(completeBatch.getCloseBatchMessage()).getValue(); + } + + } + + @Override + public MessageID postBatch(CompleteBatch completeBatch, FutureCallback callback) { + + Futures.addCallback(executorService.schedule(new CompleteBatchPoster(completeBatch), subsrciptionDelay, TimeUnit.MILLISECONDS), callback); + + digest.update(completeBatch); + return digest.digestAsMessageID(); + + } + + private class BatchBeginner implements Callable { + + private final BeginBatchMessage msg; + + public BatchBeginner(BeginBatchMessage msg) { + this.msg = msg; + } + + + @Override + public Boolean call() throws Exception { + return server.beginBatch(msg).getValue(); + } + + } + + @Override + public void beginBatch(BeginBatchMessage beginBatchMessage, FutureCallback callback) { + Futures.addCallback(executorService.submit(new BatchBeginner(beginBatchMessage)), callback); + } + + private class BatchDataPoster implements Callable { + + private final ByteString signerId; + private final int batchId; + private final List batchDataList; + private final int startPosition; + + public BatchDataPoster(ByteString signerId, int batchId, List batchDataList, int startPosition) { + this.signerId = signerId; + this.batchId = batchId; + this.batchDataList = batchDataList; + this.startPosition = startPosition; + } + + + @Override + public Boolean call() throws Exception { + + BatchMessage.Builder msgBuilder = BatchMessage.newBuilder() + .setSignerId(signerId) + .setBatchId(batchId); + + int i = startPosition; + for (BatchData data : batchDataList){ + + msgBuilder.setSerialNum(i) + .setData(data); + + if (!server.postBatchMessage(msgBuilder.build()).getValue()) + return false; + + i++; + + } + + return true; + + } + + } + + @Override + public void postBatchData(byte[] signerId, int batchId, List batchDataList, int startPosition, FutureCallback callback) { + postBatchData(ByteString.copyFrom(signerId), batchId, batchDataList, startPosition, callback); + } + + @Override + public void postBatchData(byte[] signerId, int batchId, List batchDataList, FutureCallback callback) { + postBatchData(signerId, batchId, batchDataList, 0, callback); + } + + @Override + public void postBatchData(ByteString signerId, int batchId, List batchDataList, int startPosition, FutureCallback callback) { + Futures.addCallback(executorService.submit(new BatchDataPoster(signerId, batchId, batchDataList, startPosition)), callback); + } + + @Override + public void postBatchData(ByteString signerId, int batchId, List batchDataList, FutureCallback callback) { + postBatchData(signerId, batchId, batchDataList, 0, callback); + } + + private class BatchCloser implements Callable { + + private final CloseBatchMessage msg; + + public BatchCloser(CloseBatchMessage msg) { + this.msg = msg; + } + + + @Override + public Boolean call() throws Exception { + return server.closeBatchMessage(msg).getValue(); + } + + } + + @Override + public void closeBatch(CloseBatchMessage closeBatchMessage, FutureCallback callback) { + Futures.addCallback(executorService.submit(new BatchCloser(closeBatchMessage)), callback); + } + + private class RedundancyGetter implements Callable { + + private final MessageID msgId; + + public RedundancyGetter(MessageID msgId) { + this.msgId = msgId; + } + + + @Override + public Float call() throws Exception { + + MessageFilterList filterList = MessageFilterList.newBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.MSG_ID) + .setId(msgId.getID()) + .build()) + .build(); + + ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); + MessageOutputStream outputStream = new MessageOutputStream<>(byteOutputStream); + server.readMessages(filterList,outputStream); + + MessageInputStream inputStream = + MessageInputStreamFactory.createMessageInputStream( + new ByteArrayInputStream(byteOutputStream.toByteArray()), + BulletinBoardMessage.class); + + if (inputStream.isAvailable()) + return 1.0f; + else + return 0.0f; + + } + + } + + @Override + public void getRedundancy(MessageID id, FutureCallback callback) { + Futures.addCallback(executorService.submit(new RedundancyGetter(id)), callback); + } + + private class MessageReader implements Callable> { + + private final MessageFilterList filterList; + + public MessageReader(MessageFilterList filterList) { + this.filterList = filterList; + } + + + @Override + public List call() throws Exception { + + ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); + MessageOutputStream outputStream = new MessageOutputStream<>(byteOutputStream); + server.readMessages(filterList, outputStream); + + MessageInputStream inputStream = + MessageInputStreamFactory.createMessageInputStream( + new ByteArrayInputStream(byteOutputStream.toByteArray()), + BulletinBoardMessage.class); + + return inputStream.asList(); + + } + + } + + @Override + public void readMessages(MessageFilterList filterList, FutureCallback> callback) { + Futures.addCallback(executorService.submit(new MessageReader(filterList)), callback); + } + + class SubscriptionCallback implements FutureCallback> { + + private MessageFilterList filterList; + private final FutureCallback> callback; + + public SubscriptionCallback(MessageFilterList filterList, FutureCallback> callback) { + this.filterList = filterList; + this.callback = callback; + } + + @Override + public void onSuccess(List result) { + + // Report new messages to user + callback.onSuccess(result); + + MessageFilterList.Builder filterBuilder = filterList.toBuilder(); + + // If any new messages arrived: update the MIN_ENTRY condition + if (result.size() > 0) { + + // Remove last filter from list (MIN_ENTRY one) + filterBuilder.removeFilter(filterBuilder.getFilterCount() - 1); + + // Add updated MIN_ENTRY filter (entry number is successor of last received entry's number) + filterBuilder.addFilter(MessageFilter.newBuilder() + .setType(FilterType.MIN_ENTRY) + .setEntry(result.get(result.size() - 1).getEntryNum() + 1) + .build()); + + } + + filterList = filterBuilder.build(); + + // Reschedule job + Futures.addCallback(executorService.submit(new MessageReader(filterList)), this); + + } + + @Override + public void onFailure(Throwable t) { + + // Notify caller about failure and terminate subscription + callback.onFailure(t); + + } + } + + @Override + public void subscribe(MessageFilterList filterList, long startEntry, FutureCallback> callback) { + + MessageFilterList subscriptionFilterList = + filterList.toBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.MIN_ENTRY) + .setEntry(startEntry) + .build()) + .build(); + + Futures.addCallback(executorService.submit(new MessageReader(subscriptionFilterList)), new SubscriptionCallback(subscriptionFilterList, callback)); + + } + + @Override + public void subscribe(MessageFilterList filterList, FutureCallback> callback) { + subscribe(filterList, 0, callback); + } + + private class CompleteBatchReader implements Callable { + + private final BatchSpecificationMessage batchSpecificationMessage; + + public CompleteBatchReader(BatchSpecificationMessage batchSpecificationMessage) { + this.batchSpecificationMessage = batchSpecificationMessage; + } + + + @Override + public CompleteBatch call() throws Exception { + + final String[] TAGS_TO_REMOVE = {BulletinBoardConstants.BATCH_TAG, BulletinBoardConstants.BATCH_ID_TAG_PREFIX}; + + CompleteBatch completeBatch = new CompleteBatch(BeginBatchMessage.newBuilder() + .setSignerId(batchSpecificationMessage.getSignerId()) + .setBatchId(batchSpecificationMessage.getBatchId()) + .build()); + + ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); + MessageOutputStream batchOutputStream = new MessageOutputStream<>(byteOutputStream); + server.readBatch(batchSpecificationMessage,batchOutputStream); + + MessageInputStream batchInputStream = + MessageInputStreamFactory.createMessageInputStream( + new ByteArrayInputStream(byteOutputStream.toByteArray()), + BatchData.class); + + completeBatch.appendBatchData(batchInputStream.asList()); + + MessageFilterList filterList = MessageFilterList.newBuilder() + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.TAG) + .setTag(BulletinBoardConstants.BATCH_TAG) + .build()) + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.TAG) + .setTag(BulletinBoardConstants.BATCH_ID_TAG_PREFIX + completeBatch.getBeginBatchMessage().getBatchId()) + .build()) + .addFilter(MessageFilter.newBuilder() + .setType(FilterType.SIGNER_ID) + .setId(completeBatch.getBeginBatchMessage().getSignerId()) + .build()) + .build(); + + byteOutputStream = new ByteArrayOutputStream(); + MessageOutputStream messageOutputStream = new MessageOutputStream<>(byteOutputStream); + server.readMessages(filterList,messageOutputStream); + + MessageInputStream messageInputStream = + MessageInputStreamFactory.createMessageInputStream( + new ByteArrayInputStream(byteOutputStream.toByteArray()), + BulletinBoardMessage.class); + + if (!messageInputStream.isAvailable()) + throw new NotFoundException("Batch does not exist"); + + BulletinBoardMessage message = messageInputStream.readMessage(); + + completeBatch.setBeginBatchMessage(BeginBatchMessage.newBuilder() + .addAllTag(BulletinBoardUtils.removePrefixTags(message, Arrays.asList(TAGS_TO_REMOVE))) + .setSignerId(message.getSig(0).getSignerId()) + .setBatchId(Integer.parseInt(BulletinBoardUtils.findTagWithPrefix(message, BulletinBoardConstants.BATCH_ID_TAG_PREFIX))) + .build()); + + completeBatch.setSignature(message.getSig(0)); + completeBatch.setTimestamp(message.getMsg().getTimestamp()); + + return completeBatch; + + } + + } + + @Override + public void readBatch(BatchSpecificationMessage batchSpecificationMessage, FutureCallback callback) { + Futures.addCallback(executorService.submit(new CompleteBatchReader(batchSpecificationMessage)), callback); + } + + private class SyncQueryHandler implements Callable { + + private final SyncQuery syncQuery; + + public SyncQueryHandler(SyncQuery syncQuery) { + this.syncQuery = syncQuery; + } + + + @Override + public SyncQueryResponse call() throws Exception { + return server.querySync(syncQuery); + } + + } + + @Override + public void querySync(SyncQuery syncQuery, FutureCallback callback) { + Futures.addCallback(executorService.submit(new SyncQueryHandler(syncQuery)), callback); + } + + /** + * This method is a stub, since the implementation only considers one server, and that is given in the constructor + * @param ignored is ignored + */ + @Override + public void init(BulletinBoardClientParams ignored) {} + + @Override + public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException { + + try { + + MessagePoster poster = new MessagePoster(msg); + poster.call(); + + digest.update(msg); + return digest.digestAsMessageID(); + + } catch (Exception e) { + return null; + } + + } + + @Override + public float getRedundancy(MessageID id) { + + try { + + RedundancyGetter getter = new RedundancyGetter(id); + return getter.call(); + + } catch (Exception e) { + return -1.0f; + } + + } + + @Override + public List readMessages(MessageFilterList filterList) { + + try { + + MessageReader reader = new MessageReader(filterList); + return reader.call(); + + } catch (Exception e){ + return null; + } + + } + + @Override + public SyncQuery generateSyncQuery(GenerateSyncQueryParams GenerateSyncQueryParams) throws CommunicationException { + return server.generateSyncQuery(GenerateSyncQueryParams); + } + + @Override + public void close() { + try { + server.close(); + } catch (CommunicationException ignored) {} + } + +} diff --git a/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java new file mode 100644 index 0000000..cf8d47d --- /dev/null +++ b/bulletin-board-client/src/main/java/meerkat/bulletinboard/ThreadedBulletinBoardSubscriber.java @@ -0,0 +1,272 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.protobuf.Timestamp; +import meerkat.comm.CommunicationException; +import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.util.BulletinBoardUtils; + +import static meerkat.protobuf.BulletinBoardAPI.FilterType.*; + +import java.sql.Time; +import java.util.*; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Created by Arbel Deutsch Peled on 03-Mar-16. + * A multi-server implementation of the {@link BulletinBoardSubscriber} + */ +public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber { + + protected final Collection clients; + protected final BulletinBoardClient localClient; + + protected Iterator clientIterator; + protected SubscriptionAsyncBulletinBoardClient currentClient; + + private long lastServerSwitchTime; + + private AtomicBoolean isSyncInProgress; + private Semaphore rescheduleSemaphore; + + private static final Float[] BREAKPOINTS = {0.5f, 0.75f, 0.9f, 0.95f, 0.99f, 0.999f}; + + public ThreadedBulletinBoardSubscriber(Collection clients, BulletinBoardClient localClient) { + + this.clients = clients; + this.localClient = localClient; + + lastServerSwitchTime = System.currentTimeMillis(); + + clientIterator = clients.iterator(); + currentClient = clientIterator.next(); + + isSyncInProgress = new AtomicBoolean(false); + rescheduleSemaphore = new Semaphore(1); + + } + + /** + * Moves to next client and performs resync with it + */ + private void nextClient() { + + try { + + rescheduleSemaphore.acquire(); + + if (!clientIterator.hasNext()){ + clientIterator = clients.iterator(); + } + + currentClient = clientIterator.next(); + + lastServerSwitchTime = System.currentTimeMillis(); + + isSyncInProgress.set(false); + + rescheduleSemaphore.release(); + + } catch (InterruptedException e) { + // TODO: log + // Do not change client + } + + } + + private abstract class SubscriberCallback implements FutureCallback { + + protected final MessageFilterList filterList; + protected final FutureCallback> callback; + private final long invocationTime; + + public SubscriberCallback(MessageFilterList filterList, FutureCallback> callback) { + + this.filterList = filterList; + this.callback = callback; + this.invocationTime = System.currentTimeMillis(); + + } + + /** + * Handles resyncing process for the given subscription after a server is switched + * Specifically: generates a sync query from the local database and uses it to query the current server + */ + private void reSync() { + + SyncQuery syncQuery = null; + try { + + syncQuery = localClient.generateSyncQuery(GenerateSyncQueryParams.newBuilder() + .setFilterList(filterList) + .addAllBreakpointList(Arrays.asList(BREAKPOINTS)) + .build()); + + } catch (CommunicationException e) { + + // Handle failure in standard way + onFailure(e); + + } + + currentClient.querySync(syncQuery, new SyncQueryCallback(filterList, callback)); + + } + + /** + * Reschedules the subscription + */ + private void reschedule() { + + try { + + rescheduleSemaphore.acquire(); + + reSync(); + + rescheduleSemaphore.release(); + + + } catch (InterruptedException e) { + + //TODO: log + + callback.onFailure(e); // Hard error: Cannot guarantee subscription safety + + } + + } + + @Override + public void onFailure(Throwable t) { + + // If server failure is not already known: switch to next client and resync + if (invocationTime > lastServerSwitchTime){ + + // Make sure only what thread switches the client + if (isSyncInProgress.compareAndSet(false, true)){ + nextClient(); + } + + } + + reschedule(); + + } + + } + + /** + * Provides handling logic for resync query callback operation + * Receives a SyncQueryResponse and reads the missing data (starting from the received timestamp) if needed + */ + protected class SyncQueryCallback extends SubscriberCallback { + + public SyncQueryCallback (MessageFilterList filterList, FutureCallback> callback) { + + super(filterList, callback); + + } + + @Override + public void onSuccess(SyncQueryResponse result) { + + final Timestamp DEFAULT_TIME = BulletinBoardUtils.toTimestampProto(946728000); // Year 2000 + + // Read required messages according to received Timestamp + + Timestamp syncTimestamp; + + if (result.hasLastTimeOfSync()) { + syncTimestamp = result.getLastTimeOfSync(); // Use returned time of sync + } else { + syncTimestamp = DEFAULT_TIME; // Get all messages + } + + MessageFilterList timestampedFilterList = filterList.toBuilder() + .removeFilter(filterList.getFilterCount()-1) // Remove MIN_ENTRY filter + .addFilter(MessageFilter.newBuilder() // Add timestamp filter + .setType(AFTER_TIME) + .setTimestamp(syncTimestamp) + .build()) + .build(); + + currentClient.readMessages(timestampedFilterList, new ReSyncCallback(filterList, callback, result.getLastEntryNum())); + + } + + } + + /** + * Provides handling logic for callback of resyncing process + * Receives the missing messages, handles them and resubscribes + */ + protected class ReSyncCallback extends SubscriberCallback> { + + private long minEntry; + + public ReSyncCallback (MessageFilterList filterList, FutureCallback> callback, long minEntry) { + + super(filterList, callback); + + this.minEntry = minEntry; + + } + + @Override + public void onSuccess(List result) { + + // Propagate result to caller + callback.onSuccess(result); + + // Renew subscription + + MessageFilterList newFilterList = filterList.toBuilder() + .removeFilter(filterList.getFilterCount()-1) // Remove current MIN_ENTRY filter + .addFilter(MessageFilter.newBuilder() // Add new MIN_ENTRY filter for current server + .setType(MIN_ENTRY) + .setEntry(minEntry) + .build()) + .build(); + + currentClient.subscribe(newFilterList, callback); + + } + + } + + /** + * Provides the handling logic for results and failures of main subscription (while there are no errors) + */ + protected class SubscriptionCallback extends SubscriberCallback> { + + public SubscriptionCallback(MessageFilterList filterList, FutureCallback> callback){ + super(filterList, callback); + } + + + @Override + public void onSuccess(List result) { + + // Propagate result to caller + callback.onSuccess(result); + + } + + } + + @Override + public void subscribe(MessageFilterList filterList, long startEntry, FutureCallback> callback) { + + currentClient.subscribe(filterList, startEntry, new SubscriptionCallback(filterList, callback)); + + } + + @Override + public void subscribe(MessageFilterList filterList, FutureCallback> callback) { + subscribe(filterList, 0, callback); + } + + +} diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericBulletinBoardClientTester.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericBulletinBoardClientTester.java new file mode 100644 index 0000000..88fb22c --- /dev/null +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/GenericBulletinBoardClientTester.java @@ -0,0 +1,519 @@ +package meerkat.bulletinboard; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import meerkat.comm.CommunicationException; +import meerkat.crypto.concrete.ECDSASignature; +import meerkat.protobuf.BulletinBoardAPI.*; +import meerkat.protobuf.Crypto; +import meerkat.util.BulletinBoardMessageComparator; +import meerkat.util.BulletinBoardMessageGenerator; + +import java.io.IOException; +import java.io.InputStream; +import java.security.*; +import java.security.cert.CertificateException; +import java.util.*; +import java.util.concurrent.Semaphore; + +import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo; +import static org.junit.Assert.*; + +/** + * Created by Arbel Deutsch Peled on 05-Dec-15. + */ +public class GenericBulletinBoardClientTester { + + // Signature resources + + private GenericBatchDigitalSignature signers[]; + private ByteString[] signerIDs; + + private static String KEYFILE_EXAMPLE = "/certs/enduser-certs/user1-key-with-password-secret.p12"; + private static String KEYFILE_EXAMPLE3 = "/certs/enduser-certs/user3-key-with-password-shh.p12"; + + private static String KEYFILE_PASSWORD1 = "secret"; + private static String KEYFILE_PASSWORD3 = "shh"; + + private static String CERT1_PEM_EXAMPLE = "/certs/enduser-certs/user1.crt"; + private static String CERT3_PEM_EXAMPLE = "/certs/enduser-certs/user3.crt"; + + // Client and callbacks + + private AsyncBulletinBoardClient bulletinBoardClient; + + private PostCallback postCallback; + private PostCallback failPostCallback = new PostCallback(true,false); + + private RedundancyCallback redundancyCallback; + private ReadCallback readCallback; + private ReadBatchCallback readBatchCallback; + + // Sync and misc + + private Semaphore jobSemaphore; + private Vector thrown; + private Random random; + + // Constructor + + public GenericBulletinBoardClientTester(AsyncBulletinBoardClient bulletinBoardClient){ + + this.bulletinBoardClient = bulletinBoardClient; + + signers = new GenericBatchDigitalSignature[2]; + signerIDs = new ByteString[signers.length]; + signers[0] = new GenericBatchDigitalSignature(new ECDSASignature()); + signers[1] = new GenericBatchDigitalSignature(new ECDSASignature()); + + InputStream keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE); + char[] password = KEYFILE_PASSWORD1.toCharArray(); + + KeyStore.Builder keyStoreBuilder; + try { + keyStoreBuilder = signers[0].getPKCS12KeyStoreBuilder(keyStream, password); + + signers[0].loadSigningCertificate(keyStoreBuilder); + + signers[0].loadVerificationCertificates(getClass().getResourceAsStream(CERT1_PEM_EXAMPLE)); + + keyStream = getClass().getResourceAsStream(KEYFILE_EXAMPLE3); + password = KEYFILE_PASSWORD3.toCharArray(); + + keyStoreBuilder = signers[1].getPKCS12KeyStoreBuilder(keyStream, password); + signers[1].loadSigningCertificate(keyStoreBuilder); + + signers[1].loadVerificationCertificates(getClass().getResourceAsStream(CERT3_PEM_EXAMPLE)); + + for (int i = 0 ; i < signers.length ; i++) { + signerIDs[i] = signers[i].getSignerID(); + } + + } catch (IOException e) { + System.err.println("Failed reading from signature file " + e.getMessage()); + fail("Failed reading from signature file " + e.getMessage()); + } catch (CertificateException e) { + System.err.println("Failed reading certificate " + e.getMessage()); + fail("Failed reading certificate " + e.getMessage()); + } catch (KeyStoreException e) { + System.err.println("Failed reading keystore " + e.getMessage()); + fail("Failed reading keystore " + e.getMessage()); + } catch (NoSuchAlgorithmException e) { + System.err.println("Couldn't find signing algorithm " + e.getMessage()); + fail("Couldn't find signing algorithm " + e.getMessage()); + } catch (UnrecoverableKeyException e) { + System.err.println("Couldn't find signing key " + e.getMessage()); + fail("Couldn't find signing key " + e.getMessage()); + } + + } + + // Callback definitions + + protected void genericHandleFailure(Throwable t){ + System.err.println(t.getCause() + " " + t.getMessage()); + thrown.add(t); + jobSemaphore.release(); + } + + private class PostCallback implements FutureCallback{ + + private boolean isAssert; + private boolean assertValue; + + public PostCallback() { + this(false); + } + + public PostCallback(boolean isAssert) { + this(isAssert,true); + } + + public PostCallback(boolean isAssert, boolean assertValue) { + this.isAssert = isAssert; + this.assertValue = assertValue; + } + + @Override + public void onSuccess(Boolean msg) { + System.err.println("Post operation completed"); + jobSemaphore.release(); + //TODO: Change Assert mechanism to exception one + if (isAssert) { + if (assertValue) { + assertThat("Post operation failed", msg, is(Boolean.TRUE)); + } else { + assertThat("Post operation succeeded unexpectedly", msg, is(Boolean.FALSE)); + } + } + } + + @Override + public void onFailure(Throwable t) { + genericHandleFailure(t); + } + } + + private class RedundancyCallback implements FutureCallback{ + + private float minRedundancy; + + public RedundancyCallback(float minRedundancy) { + this.minRedundancy = minRedundancy; + } + + @Override + public void onSuccess(Float redundancy) { + System.err.println("Redundancy found is: " + redundancy); + jobSemaphore.release(); + assertThat(redundancy, greaterThanOrEqualTo(minRedundancy)); + } + + @Override + public void onFailure(Throwable t) { + genericHandleFailure(t); + } + } + + private class ReadCallback implements FutureCallback>{ + + private List expectedMsgList; + + public ReadCallback(List expectedMsgList) { + this.expectedMsgList = expectedMsgList; + } + + @Override + public void onSuccess(List messages) { + + System.err.println(messages); + jobSemaphore.release(); + + BulletinBoardMessageComparator msgComparator = new BulletinBoardMessageComparator(); + + assertThat(messages.size(), is(expectedMsgList.size())); + + Iterator expectedMessageIterator = expectedMsgList.iterator(); + Iterator receivedMessageIterator = messages.iterator(); + + while (expectedMessageIterator.hasNext()) { + assertThat(msgComparator.compare(expectedMessageIterator.next(), receivedMessageIterator.next()), is(0)); + } + + } + + @Override + public void onFailure(Throwable t) { + genericHandleFailure(t); + } + } + + private class ReadBatchCallback implements FutureCallback { + + private CompleteBatch expectedBatch; + + public ReadBatchCallback(CompleteBatch expectedBatch) { + this.expectedBatch = expectedBatch; + } + + @Override + public void onSuccess(CompleteBatch batch) { + + System.err.println(batch); + jobSemaphore.release(); + + assertThat("Batch returned is incorrect", batch, is(equalTo(expectedBatch))); + + } + + @Override + public void onFailure(Throwable t) { + genericHandleFailure(t); + } + } + + // Randomness generators + + private byte randomByte(){ + return (byte) random.nextInt(); + } + + private byte[] randomByteArray(int length) { + + byte[] randomBytes = new byte[length]; + + for (int i = 0; i < length ; i++){ + randomBytes[i] = randomByte(); + } + + return randomBytes; + + } + + private CompleteBatch createRandomBatch(int signer, int batchId, int length) throws SignatureException { + + CompleteBatch completeBatch = new CompleteBatch(); + + // Create data + + completeBatch.setBeginBatchMessage(BeginBatchMessage.newBuilder() + .setSignerId(signerIDs[signer]) + .setBatchId(batchId) + .addTag("Test") + .build()); + + for (int i = 0 ; i < length ; i++){ + + BatchData batchData = BatchData.newBuilder() + .setData(ByteString.copyFrom(randomByteArray(i))) + .build(); + + completeBatch.appendBatchData(batchData); + + } + + completeBatch.setTimestamp(Timestamp.newBuilder() + .setSeconds(Math.abs(90)) + .setNanos(50) + .build()); + + signers[signer].updateContent(completeBatch); + + completeBatch.setSignature(signers[signer].sign()); + + return completeBatch; + + } + + // Test methods + + /** + * Takes care of initializing the client and the test resources + */ + public void init(){ + + random = new Random(0); // We use insecure randomness in tests for repeatability + + postCallback = new PostCallback(); + redundancyCallback = new RedundancyCallback((float) 1.0); + + thrown = new Vector<>(); + jobSemaphore = new Semaphore(0); + + } + + /** + * Closes the client and makes sure the test fails when an exception occurred in a separate thread + */ + + public void close() { + + if (thrown.size() > 0) { + assert false; + } + + } + + /** + * Tests the standard post, redundancy and read methods + */ + public void postTest() { + + byte[] b1 = {(byte) 1, (byte) 2, (byte) 3, (byte) 4}; + byte[] b2 = {(byte) 11, (byte) 12, (byte) 13, (byte) 14}; + byte[] b3 = {(byte) 21, (byte) 22, (byte) 23, (byte) 24}; + + BulletinBoardMessage msg; + + MessageFilterList filterList; + List msgList; + + MessageID messageID; + + msg = BulletinBoardMessage.newBuilder() + .setMsg(UnsignedBulletinBoardMessage.newBuilder() + .addTag("Signature") + .addTag("Trustee") + .setData(ByteString.copyFrom(b1)) + .setTimestamp(Timestamp.newBuilder() + .setSeconds(20) + .setNanos(30) + .build()) + .build()) + .addSig(Crypto.Signature.newBuilder() + .setType(Crypto.SignatureType.DSA) + .setData(ByteString.copyFrom(b2)) + .setSignerId(ByteString.copyFrom(b3)) + .build()) + .addSig(Crypto.Signature.newBuilder() + .setType(Crypto.SignatureType.ECDSA) + .setData(ByteString.copyFrom(b3)) + .setSignerId(ByteString.copyFrom(b2)) + .build()) + .build(); + + messageID = bulletinBoardClient.postMessage(msg,postCallback); + + try { + jobSemaphore.acquire(); + } catch (InterruptedException e) { + System.err.println(e.getCause() + " " + e.getMessage()); + } + + bulletinBoardClient.getRedundancy(messageID,redundancyCallback); + + filterList = MessageFilterList.newBuilder() + .addFilter( + MessageFilter.newBuilder() + .setType(FilterType.TAG) + .setTag("Signature") + .build() + ) + .addFilter( + MessageFilter.newBuilder() + .setType(FilterType.TAG) + .setTag("Trustee") + .build() + ) + .build(); + + msgList = new LinkedList<>(); + msgList.add(msg); + + readCallback = new ReadCallback(msgList); + + bulletinBoardClient.readMessages(filterList, readCallback); + try { + jobSemaphore.acquire(2); + } catch (InterruptedException e) { + System.err.println(e.getCause() + " " + e.getMessage()); + } + + } + + /** + * Tests posting a batch by parts + * Also tests not being able to post to a closed batch + * @throws CommunicationException, SignatureException, InterruptedException + */ + public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + final int SIGNER = 1; + final int BATCH_ID = 100; + final int BATCH_LENGTH = 100; + + CompleteBatch completeBatch = createRandomBatch(SIGNER, BATCH_ID, BATCH_LENGTH); + + // Begin batch + + bulletinBoardClient.beginBatch(completeBatch.getBeginBatchMessage(), postCallback); + + jobSemaphore.acquire(); + + // Post data + + bulletinBoardClient.postBatchData(signerIDs[SIGNER], BATCH_ID, completeBatch.getBatchDataList(), postCallback); + + jobSemaphore.acquire(); + + // Close batch + + CloseBatchMessage closeBatchMessage = completeBatch.getCloseBatchMessage(); + + bulletinBoardClient.closeBatch(closeBatchMessage, postCallback); + + jobSemaphore.acquire(); + + // Attempt to open batch again + + bulletinBoardClient.beginBatch(completeBatch.getBeginBatchMessage(), failPostCallback); + + // Attempt to add batch data + + bulletinBoardClient.postBatchData(signerIDs[SIGNER], BATCH_ID, completeBatch.getBatchDataList(), failPostCallback); + + jobSemaphore.acquire(2); + + // Read batch data + + BatchSpecificationMessage batchSpecificationMessage = + BatchSpecificationMessage.newBuilder() + .setSignerId(signerIDs[SIGNER]) + .setBatchId(BATCH_ID) + .setStartPosition(0) + .build(); + + readBatchCallback = new ReadBatchCallback(completeBatch); + + bulletinBoardClient.readBatch(batchSpecificationMessage, readBatchCallback); + + jobSemaphore.acquire(); + + } + + /** + * Posts a complete batch message + * Checks reading of the message + * @throws CommunicationException, SignatureException, InterruptedException + */ + public void testCompleteBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + final int SIGNER = 0; + final int BATCH_ID = 101; + final int BATCH_LENGTH = 50; + + // Post batch + + CompleteBatch completeBatch = createRandomBatch(SIGNER, BATCH_ID, BATCH_LENGTH); + + bulletinBoardClient.postBatch(completeBatch,postCallback); + + jobSemaphore.acquire(); + + // Read batch + + BatchSpecificationMessage batchSpecificationMessage = + BatchSpecificationMessage.newBuilder() + .setSignerId(signerIDs[SIGNER]) + .setBatchId(BATCH_ID) + .setStartPosition(0) + .build(); + + readBatchCallback = new ReadBatchCallback(completeBatch); + + bulletinBoardClient.readBatch(batchSpecificationMessage, readBatchCallback); + + jobSemaphore.acquire(); + + } + + /** + * Tests that an unopened batch cannot be closed + * @throws CommunicationException, InterruptedException + */ + public void testInvalidBatchClose() throws CommunicationException, InterruptedException { + + final int NON_EXISTENT_BATCH_ID = 999; + + CloseBatchMessage closeBatchMessage = + CloseBatchMessage.newBuilder() + .setBatchId(NON_EXISTENT_BATCH_ID) + .setBatchLength(1) + .setSig(Crypto.Signature.getDefaultInstance()) + .setTimestamp(Timestamp.newBuilder() + .setSeconds(9) + .setNanos(12) + .build()) + .build(); + + // Try to close the (unopened) batch; + + bulletinBoardClient.closeBatch(closeBatchMessage, failPostCallback); + + jobSemaphore.acquire(); + + } + +} diff --git a/bulletin-board-client/src/test/java/meerkat/bulletinboard/ThreadedBulletinBoardClientIntegrationTest.java b/bulletin-board-client/src/test/java/meerkat/bulletinboard/ThreadedBulletinBoardClientIntegrationTest.java new file mode 100644 index 0000000..b69cf72 --- /dev/null +++ b/bulletin-board-client/src/test/java/meerkat/bulletinboard/ThreadedBulletinBoardClientIntegrationTest.java @@ -0,0 +1,95 @@ +package meerkat.bulletinboard; + +import meerkat.comm.CommunicationException; + +import meerkat.protobuf.Voting.*; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.security.SignatureException; +import java.util.LinkedList; +import java.util.List; + +/** + * Created by Arbel Deutsch Peled on 05-Dec-15. + */ +public class ThreadedBulletinBoardClientIntegrationTest { + + // Server data + + private static String PROP_GETTY_URL = "gretty.httpBaseURI"; + private static String DEFAULT_BASE_URL = "http://localhost:8081"; + private static String BASE_URL = System.getProperty(PROP_GETTY_URL, DEFAULT_BASE_URL); + + // Tester + private GenericBulletinBoardClientTester clientTest; + + public ThreadedBulletinBoardClientIntegrationTest(){ + + ThreadedBulletinBoardClient client = new ThreadedBulletinBoardClient(); + + List testDB = new LinkedList<>(); + testDB.add(BASE_URL); + + client.init(BulletinBoardClientParams.newBuilder() + .addAllBulletinBoardAddress(testDB) + .setMinRedundancy((float) 1.0) + .build()); + + clientTest = new GenericBulletinBoardClientTester(client); + + } + + // Test methods + + /** + * Takes care of initializing the client and the test resources + */ + @Before + public void init(){ + + clientTest.init(); + + } + + /** + * Closes the client and makes sure the test fails when an exception occurred in a separate thread + */ + + @After + public void close() { + + clientTest.close(); + + } + + @Test + public void postTest() { + + clientTest.postTest(); + + } + + @Test + public void testBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + clientTest.testBatchPost(); + } + + @Test + public void testCompleteBatchPost() throws CommunicationException, SignatureException, InterruptedException { + + clientTest.testCompleteBatchPost(); + + } + + @Test + public void testInvalidBatchClose() throws CommunicationException, InterruptedException { + + clientTest.testInvalidBatchClose(); + + } + +}