Working version of ThreadedBulletinBoardClient.

The integration test also passes with SQLite and MySQL engines.
Bulletin-Board-Client-phase_1
Arbel Deutsch Peled 2015-12-14 09:45:40 +02:00
parent bfc62cd77c
commit 79d29a05d3
4 changed files with 39 additions and 17 deletions

View File

@ -1,6 +1,5 @@
package meerkat.bulletinboard;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import meerkat.comm.CommunicationException;
import meerkat.crypto.Digest;
@ -126,7 +125,7 @@ public class BulletinClientWorker implements Callable<BulletinClientJobResult> {
msg = MessageFilterList.newBuilder()
.addFilter(MessageFilter.newBuilder()
.setType(FilterType.MSG_ID)
.setId(payload.toByteString())
.setId(((MessageID) payload).getID())
.build()
).build();

View File

@ -35,6 +35,8 @@ public class PostMessageFutureCallback extends ClientFutureCallback {
if (job.getMinServers() > 0 && job.isRetry()) {
Futures.addCallback(listeningExecutor.submit(new BulletinClientWorker(job)), this);
}
callback.handleCallback(null);
}
@Override

View File

@ -1,5 +1,4 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import meerkat.bulletinboard.BulletinBoardClient;
import meerkat.bulletinboard.BulletinBoardClient.ClientCallback;
import meerkat.bulletinboard.ThreadedBulletinBoardClient;
@ -12,32 +11,33 @@ import meerkat.util.BulletinBoardMessageComparator;
import org.junit.Before;
import org.junit.Test;
import static java.lang.Thread.sleep;
import static org.junit.Assert.*;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.number.OrderingComparison.*;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.*;
import java.util.concurrent.Semaphore;
/**
* Created by Arbel Deutsch Peled on 05-Dec-15.
*/
public class BulletinBoardClientIntegrationTest {
Semaphore jobSemaphore;
Vector<Throwable> thrown;
private class PostCallback implements ClientCallback<Object>{
@Override
public void handleCallback(Object msg) {
System.err.println("Post operation completed");
jobSemaphore.release();
}
@Override
public void handleFailure(Throwable t) {
System.err.println(t.getCause() + "\n" + t.getMessage());
assert false;
thrown.add(t);
jobSemaphore.release();
}
}
@ -51,13 +51,15 @@ public class BulletinBoardClientIntegrationTest {
@Override
public void handleCallback(Float redundancy) {
System.err.println("Redundancy found is: " + redundancy);
jobSemaphore.release();
assertThat(redundancy, greaterThanOrEqualTo(minRedundancy));
}
@Override
public void handleFailure(Throwable t) {
System.err.println(t.getCause() + "\n" + t.getMessage());
assert false;
thrown.add(t);
jobSemaphore.release();
}
}
@ -71,6 +73,10 @@ public class BulletinBoardClientIntegrationTest {
@Override
public void handleCallback(List<BulletinBoardMessage> messages) {
System.err.println(messages);
jobSemaphore.release();
BulletinBoardMessageComparator msgComparator = new BulletinBoardMessageComparator();
assertThat(messages.size(), is(expectedMsgList.size()));
@ -86,8 +92,8 @@ public class BulletinBoardClientIntegrationTest {
@Override
public void handleFailure(Throwable t) {
System.err.println(t.getCause() + "\n" + t.getMessage());
assert false;
thrown.add(t);
jobSemaphore.release();
}
}
@ -117,6 +123,9 @@ public class BulletinBoardClientIntegrationTest {
postCallback = new PostCallback();
redundancyCallback = new RedundancyCallback((float) 1.0);
thrown = new Vector<>();
jobSemaphore = new Semaphore(0);
}
@Test
@ -157,9 +166,9 @@ public class BulletinBoardClientIntegrationTest {
messageID = bulletinBoardClient.postMessage(msg,postCallback);
try {
sleep(2000);
jobSemaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
System.err.println(e.getCause() + " " + e.getMessage());
}
bulletinBoardClient.getRedundancy(messageID,redundancyCallback);
@ -185,9 +194,21 @@ public class BulletinBoardClientIntegrationTest {
readCallback = new ReadCallback(msgList);
bulletinBoardClient.readMessages(filterList, readCallback);
try {
jobSemaphore.acquire(2);
} catch (InterruptedException e) {
System.err.println(e.getCause() + " " + e.getMessage());
}
bulletinBoardClient.close();
for (Throwable t : thrown) {
System.err.println(t.getMessage());
}
if (thrown.size() > 0) {
assert false;
}
}
}

View File

@ -31,7 +31,7 @@
<param-value>mypass</param-value></context-param>
<context-param>
<param-name>dbType</param-name>
<param-value>H2</param-value></context-param>
<param-value>SQLite</param-value></context-param>
<listener>
<listener-class>meerkat.bulletinboard.webapp.BulletinBoardWebApp</listener-class>
</listener>