First working version of Threaded Bulletin Board Client.

Tests do not report well.
Bulletin-Board-Client-phase_1
Arbel Deutsch Peled 2015-12-12 22:45:31 +02:00
parent 13733e6610
commit 4f2d0e7738
5 changed files with 37 additions and 9 deletions

View File

@ -37,6 +37,10 @@ public class BulletinClientJob {
this.maxRetry = maxRetry;
}
public void updateServerAddresses(List<String> newServerAdresses) {
this.serverAddresses = newServerAdresses;
}
public List<String> getServerAddresses() {
return serverAddresses;
}
@ -57,10 +61,6 @@ public class BulletinClientJob {
return maxRetry;
}
public Iterator<String> getAddressIterator() {
return serverAddresses.iterator();
}
public void shuffleAddresses() {
Collections.shuffle(serverAddresses);
}

View File

@ -17,6 +17,8 @@ import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
/**
@ -80,6 +82,8 @@ public class BulletinClientWorker implements Callable<BulletinClientJobResult> {
String requestPath;
Message msg;
List<String> serverAddresses = new LinkedList<String>(job.getServerAddresses());
Message payload = job.getPayload();
BulletinBoardMessageList msgList;
@ -114,7 +118,7 @@ public class BulletinClientWorker implements Callable<BulletinClientJobResult> {
case GET_REDUNDANCY:
// Make sure the payload is a MessageId
if (!(payload instanceof MessageID)) {
throw new IllegalArgumentException("Cannot search for an object that is not an instance of BulletinBoardMessage");
throw new IllegalArgumentException("Cannot search for an object that is not an instance of MessageID");
}
requestPath = Constants.READ_MESSAGES_PATH;
@ -135,7 +139,7 @@ public class BulletinClientWorker implements Callable<BulletinClientJobResult> {
// Iterate through servers
Iterator<String> addressIterator = job.getAddressIterator();
Iterator<String> addressIterator = serverAddresses.iterator();
while (addressIterator.hasNext()) {
@ -192,7 +196,8 @@ public class BulletinClientWorker implements Callable<BulletinClientJobResult> {
case POST_MESSAGE:
// The job now contains the information required to ascertain whether enough server posts have succeeded
// It also contains the list of servers in which the post was not successful
// It will also contain the list of servers in which the post was not successful
job.updateServerAddresses(serverAddresses);
return new BulletinClientJobResult(job, null);
case GET_REDUNDANCY:

View File

@ -13,6 +13,7 @@ import meerkat.protobuf.Voting;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Created by Arbel Deutsch Peled on 05-Dec-15.
@ -107,7 +108,7 @@ public class ThreadedBulletinBoardClient implements BulletinBoardClient {
public void readMessages(MessageFilterList filterList, ClientCallback<List<BulletinBoardMessage>> callback) {
// Create job
BulletinClientJob job = new BulletinClientJob(meerkatDBs, minAbsoluteRedundancy, BulletinClientJob.JobType.GET_REDUNDANCY,
BulletinClientJob job = new BulletinClientJob(meerkatDBs, minAbsoluteRedundancy, BulletinClientJob.JobType.READ_MESSAGES,
filterList, READ_MESSAGES_RETRY_NUM);
// Submit job and create callback
@ -115,4 +116,16 @@ public class ThreadedBulletinBoardClient implements BulletinBoardClient {
}
@Override
public void close() {
try {
listeningExecutor.shutdown();
while (! listeningExecutor.isShutdown()) {
listeningExecutor.awaitTermination(10, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
System.err.println(e.getCause() + " " + e.getMessage());
}
}
}

View File

@ -30,7 +30,9 @@ public class BulletinBoardClientIntegrationTest {
private class PostCallback implements ClientCallback<Object>{
@Override
public void handleCallback(Object msg) {}
public void handleCallback(Object msg) {
System.err.println("Post operation completed");
}
@Override
public void handleFailure(Throwable t) {
@ -184,6 +186,8 @@ public class BulletinBoardClientIntegrationTest {
bulletinBoardClient.readMessages(filterList, readCallback);
bulletinBoardClient.close();
}
}

View File

@ -45,4 +45,10 @@ public interface BulletinBoardClient {
*/
void readMessages(MessageFilterList filterList, ClientCallback<List<BulletinBoardMessage>> callback);
/**
* Closes all connections, if any.
* This is done in a synchronous (blocking) way.
*/
void close();
}