Threaded Client integration tests passing
parent
d1f7413cde
commit
cc2888483d
|
@ -160,11 +160,18 @@ public class ThreadedBulletinBoardClient extends SimpleBulletinBoardClient imple
|
|||
}
|
||||
|
||||
@Override
|
||||
public void closeBatch(BatchIdentifier payload, Timestamp timestamp, Iterable<Signature> signatures, FutureCallback<Boolean> callback) {
|
||||
public void closeBatch(BatchIdentifier payload, Timestamp timestamp, Iterable<Signature> signatures, FutureCallback<Boolean> callback)
|
||||
throws IllegalArgumentException{
|
||||
|
||||
if (!(payload instanceof MultiServerBatchIdentifier)) {
|
||||
throw new IllegalArgumentException("Error: batch identifier supplied was not created by this class.");
|
||||
}
|
||||
|
||||
MultiServerBatchIdentifier identifier = (MultiServerBatchIdentifier) payload;
|
||||
|
||||
// Create job
|
||||
MultiServerCloseBatchWorker worker =
|
||||
new MultiServerCloseBatchWorker(clients, minAbsoluteRedundancy, payload, timestamp, signatures, POST_MESSAGE_RETRY_NUM, callback);
|
||||
new MultiServerCloseBatchWorker(clients, minAbsoluteRedundancy, identifier, timestamp, signatures, POST_MESSAGE_RETRY_NUM, callback);
|
||||
|
||||
// Submit job
|
||||
executorService.submit(worker);
|
||||
|
|
|
@ -46,7 +46,7 @@ public class MultiServerBeginBatchWorker extends MultiServerWorker<Iterable<Stri
|
|||
|
||||
if (remainingServers.decrementAndGet() <= 0){
|
||||
|
||||
if (minServers.get() <= 0) {
|
||||
if (minServers.decrementAndGet() <= 0) {
|
||||
MultiServerBeginBatchWorker.this.onSuccess(new MultiServerBatchIdentifier(identifiers));
|
||||
} else {
|
||||
MultiServerBeginBatchWorker.this.onFailure(new CommunicationException("Could not open batch in enough servers"));
|
||||
|
|
|
@ -3,21 +3,27 @@ package meerkat.bulletinboard.workers.multiserver;
|
|||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.protobuf.Timestamp;
|
||||
import meerkat.bulletinboard.AsyncBulletinBoardClient.BatchIdentifier;
|
||||
import meerkat.bulletinboard.BatchDataContainer;
|
||||
import meerkat.bulletinboard.MultiServerBatchIdentifier;
|
||||
import meerkat.bulletinboard.MultiServerWorker;
|
||||
import meerkat.bulletinboard.SingleServerBulletinBoardClient;
|
||||
import meerkat.crypto.DigitalSignature;
|
||||
import meerkat.protobuf.Crypto;
|
||||
import meerkat.protobuf.Crypto.Signature;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Created by Arbel Deutsch Peled on 27-Dec-15.
|
||||
*/
|
||||
public class MultiServerCloseBatchWorker extends MultiServerGenericPostWorker<BatchIdentifier> {
|
||||
public class MultiServerCloseBatchWorker extends MultiServerWorker<MultiServerBatchIdentifier, Boolean> {
|
||||
|
||||
private final Timestamp timestamp;
|
||||
private final Iterable<Signature> signatures;
|
||||
private final Iterable<Crypto.Signature> signatures;
|
||||
|
||||
public MultiServerCloseBatchWorker(List<SingleServerBulletinBoardClient> clients, int minServers,
|
||||
BatchIdentifier payload, Timestamp timestamp, Iterable<Signature> signatures,
|
||||
public MultiServerCloseBatchWorker(List<SingleServerBulletinBoardClient> clients,
|
||||
int minServers, MultiServerBatchIdentifier payload, Timestamp timestamp, Iterable<Crypto.Signature> signatures,
|
||||
int maxRetry, FutureCallback<Boolean> futureCallback) {
|
||||
|
||||
super(clients, minServers, payload, maxRetry, futureCallback);
|
||||
|
@ -28,8 +34,50 @@ public class MultiServerCloseBatchWorker extends MultiServerGenericPostWorker<Ba
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doPost(SingleServerBulletinBoardClient client, BatchIdentifier payload) {
|
||||
client.closeBatch(payload, timestamp, signatures, this);
|
||||
public void run() {
|
||||
|
||||
Iterator<BatchIdentifier> identifierIterator = payload.getIdentifiers().iterator();
|
||||
|
||||
// Iterate through client
|
||||
|
||||
for (SingleServerBulletinBoardClient client : clients) {
|
||||
|
||||
if (identifierIterator.hasNext()) {
|
||||
|
||||
// Fetch the batch identifier supplied by the specific client (may be null if batch open failed on client
|
||||
|
||||
BatchIdentifier identifier = identifierIterator.next();
|
||||
|
||||
if (identifier != null) {
|
||||
|
||||
// Post the data with the matching identifier to the client
|
||||
client.closeBatch(identifier, timestamp, signatures, this);
|
||||
|
||||
} else {
|
||||
|
||||
// Count servers with no batch identifier as failed
|
||||
maxFailedServers.decrementAndGet();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(Boolean result) {
|
||||
if (minServers.decrementAndGet() <= 0){
|
||||
succeed(result);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
if (maxFailedServers.decrementAndGet() <= 0){
|
||||
fail(t);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ public class MultiServerPostBatchDataWorker extends MultiServerWorker<BatchDataC
|
|||
if (identifier != null) {
|
||||
|
||||
// Post the data with the matching identifier to the client
|
||||
client.postBatchData(identifierIterator.next(), payload.batchChunkList, payload.startPosition, this);
|
||||
client.postBatchData(identifier, payload.batchChunkList, payload.startPosition, this);
|
||||
|
||||
} else {
|
||||
|
||||
|
|
|
@ -35,13 +35,16 @@ public class SingleServerBeginBatchWorker extends SingleServerWorker<BeginBatchM
|
|||
|
||||
try {
|
||||
|
||||
return response.readEntity(Int64Value.class);
|
||||
Int64Value result = response.readEntity(Int64Value.class);
|
||||
return result;
|
||||
|
||||
} catch (ProcessingException | IllegalStateException e) {
|
||||
|
||||
// Post to this server failed
|
||||
throw new CommunicationException("Could not contact the server");
|
||||
throw new CommunicationException("Could not contact the server. Original error: " + e.getMessage());
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new CommunicationException("Could not contact the server. Original error: " + e.getMessage());
|
||||
}
|
||||
finally {
|
||||
response.close();
|
||||
|
|
|
@ -289,7 +289,7 @@ public class GenericBulletinBoardClientTester {
|
|||
/**
|
||||
* Tests the standard post, redundancy and read methods
|
||||
*/
|
||||
public void postTest() {
|
||||
public void testPost() {
|
||||
|
||||
byte[] b1 = {(byte) 1, (byte) 2, (byte) 3, (byte) 4};
|
||||
byte[] b2 = {(byte) 11, (byte) 12, (byte) 13, (byte) 14};
|
||||
|
@ -373,7 +373,12 @@ public class GenericBulletinBoardClientTester {
|
|||
final int CHUNK_SIZE = 10;
|
||||
final int TAG_NUM = 10;
|
||||
|
||||
final BulletinBoardMessage msg = generator.generateRandomMessage(signers, BATCH_LENGTH, TAG_NUM);
|
||||
final Timestamp timestamp = Timestamp.newBuilder()
|
||||
.setSeconds(141515)
|
||||
.setNanos(859018)
|
||||
.build();
|
||||
|
||||
final BulletinBoardMessage msg = generator.generateRandomMessage(signers, timestamp, BATCH_LENGTH, TAG_NUM);
|
||||
|
||||
// Begin batch
|
||||
|
||||
|
@ -387,7 +392,17 @@ public class GenericBulletinBoardClientTester {
|
|||
@Override
|
||||
public void onSuccess(Boolean result) {
|
||||
|
||||
bulletinBoardClient.closeBatch(identifier, msg.getMsg().getTimestamp(), msg.getSigList(), postCallback);
|
||||
bulletinBoardClient.closeBatch(identifier, msg.getMsg().getTimestamp(), msg.getSigList(), new FutureCallback<Boolean>() {
|
||||
@Override
|
||||
public void onSuccess(Boolean result) {
|
||||
jobSemaphore.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
genericHandleFailure(t);
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
|
@ -429,7 +444,12 @@ public class GenericBulletinBoardClientTester {
|
|||
final int CHUNK_SIZE = 99;
|
||||
final int TAG_NUM = 8;
|
||||
|
||||
final BulletinBoardMessage msg = generator.generateRandomMessage(signers, BATCH_LENGTH, TAG_NUM);
|
||||
final Timestamp timestamp = Timestamp.newBuilder()
|
||||
.setSeconds(7776151)
|
||||
.setNanos(252616)
|
||||
.build();
|
||||
|
||||
final BulletinBoardMessage msg = generator.generateRandomMessage(signers, timestamp, BATCH_LENGTH, TAG_NUM);
|
||||
|
||||
// Post batch
|
||||
|
||||
|
|
|
@ -81,9 +81,9 @@ public class LocalBulletinBoardClientTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void postTest() {
|
||||
public void testPost() {
|
||||
|
||||
clientTest.postTest();
|
||||
clientTest.testPost();
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -66,9 +66,9 @@ public class ThreadedBulletinBoardClientIntegrationTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void postTest() {
|
||||
public void testPost() {
|
||||
|
||||
clientTest.postTest();
|
||||
clientTest.testPost();
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue