Working integrated version of Scanner WebApp

Fully testsed
Moved BoolMsg and IntMsg to Comm package (from BulletinBoardAPI)
PollingStation-ScannerWebApp
arbel.peled 2016-05-31 15:26:56 +03:00
parent 061dc69fbc
commit 347e826f73
20 changed files with 374 additions and 121 deletions

View File

@ -1,21 +1,14 @@
package meerkat.bulletinboard;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import meerkat.comm.CommunicationException;
import meerkat.comm.MessageInputStream;
import meerkat.crypto.Digest;
import meerkat.crypto.concrete.SHA256Digest;
import meerkat.protobuf.BulletinBoardAPI;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Comm.*;
import meerkat.protobuf.Voting.*;
import meerkat.rest.*;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import javax.ws.rs.client.Client;

View File

@ -6,7 +6,7 @@ import static meerkat.bulletinboard.BulletinBoardConstants.CLOSE_BATCH_PATH;
/**
* Created by Arbel Deutsch Peled on 27-Dec-15.
* Tries to contact server once and perform a close batch operation
* Tries to contact server once and perform a stop batch operation
*/
public class SingleServerCloseBatchWorker extends SingleServerGenericPostWorker<CloseBatchMessage> {

View File

@ -2,7 +2,7 @@ package meerkat.bulletinboard.workers.singleserver;
import meerkat.bulletinboard.SingleServerWorker;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI.BoolMsg;
import meerkat.protobuf.Comm.*;
import meerkat.rest.Constants;
import javax.ws.rs.ProcessingException;

View File

@ -508,7 +508,7 @@ public class GenericBulletinBoardClientTester {
.build())
.build();
// Try to close the (unopened) batch;
// Try to stop the (unopened) batch;
bulletinBoardClient.closeBatch(closeBatchMessage, failPostCallback);

View File

@ -17,6 +17,7 @@ import meerkat.crypto.concrete.ECDSASignature;
import meerkat.crypto.concrete.SHA256Digest;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Comm.*;
import meerkat.protobuf.Crypto.Signature;
import meerkat.protobuf.Crypto.SignatureVerificationKey;
@ -27,7 +28,6 @@ import javax.sql.DataSource;
import meerkat.util.BulletinBoardUtils;
import meerkat.util.TimestampComparator;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.support.GeneratedKeyHolder;

View File

@ -15,8 +15,8 @@ import meerkat.bulletinboard.sqlserver.MySQLQueryProvider;
import meerkat.bulletinboard.sqlserver.SQLiteQueryProvider;
import meerkat.comm.CommunicationException;
import meerkat.comm.MessageOutputStream;
import meerkat.protobuf.BulletinBoardAPI;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Comm.*;
import static meerkat.bulletinboard.BulletinBoardConstants.*;
import static meerkat.rest.Constants.*;

View File

@ -8,6 +8,7 @@ import com.google.protobuf.Timestamp;
import meerkat.comm.MessageInputStream;
import meerkat.protobuf.Crypto.*;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Comm.*;
import static meerkat.bulletinboard.BulletinBoardConstants.*;
import meerkat.rest.Constants;
import meerkat.rest.ProtobufMessageBodyReader;
@ -20,7 +21,6 @@ import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.InputStream;
import java.util.List;

View File

@ -28,8 +28,8 @@ import meerkat.crypto.Digest;
import meerkat.crypto.concrete.ECDSASignature;
import meerkat.crypto.concrete.SHA256Digest;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Comm.*;
import meerkat.util.BulletinBoardMessageGenerator;
import org.h2.util.DateTimeUtils;
import static org.junit.Assert.*;
import static org.hamcrest.CoreMatchers.*;

View File

@ -3,6 +3,7 @@ package meerkat.bulletinboard;
import meerkat.comm.CommunicationException;
import meerkat.comm.MessageOutputStream;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Comm.*;
import java.util.Collection;
@ -62,8 +63,8 @@ public interface BulletinBoardServer{
public BoolMsg postBatchMessage(BatchMessage batchMessage) throws CommunicationException;
/**
* Attempts to close and finalize a batch message
* @param message contains the data necessary to close the batch; in particular: the signature for the batch
* Attempts to stop and finalize a batch message
* @param message contains the data necessary to stop the batch; in particular: the signature for the batch
* @return TRUE if the batch was successfully closed, FALSE otherwise
* Specifically, if the signature is invalid or if some of the batch parts have not yet been submitted: the value returned will be FALSE
* @throws CommunicationException on DB connection error

View File

@ -7,9 +7,7 @@ public interface PollingStationConstants {
// Relative addresses for Scanner operations
public static final String POLLING_STATION_WEB_SCANNER_PATH = "/scanner";
public static final String POLLING_STATION_WEB_SCANNER_SCAN_PATH = "/scan";
public static final String POLLING_STATION_WEB_SCANNER_ERROR_PATH = "/error";
}

View File

@ -1,29 +1,61 @@
package meerkat.pollingstation;
import com.google.common.util.concurrent.FutureCallback;
import meerkat.protobuf.Comm.BoolMsg;
import meerkat.protobuf.PollingStation.*;
/**
* Created by Arbel on 05/05/2016.
* An interface for the scanner used by the Polling Station Committee
* The scanner works as a producer, while the polling station is the consumer
* That is to say: scans are pushed from the scanner rather than requested by the polling station
*/
public interface PollingStationScanner {
/**
* Subscribes to new scans
* @param scanCallback is the handler for scanned data
* An interface for processing scans (Polling Station side)
*/
public void subscribe(FutureCallback<ScannedData> scanCallback);
public interface Consumer {
/**
* Sets up the connection to the scanner and begins receiving scans
* @throws Exception when the operation fails
*/
public void start() throws Exception;
/**
* Closes the connection to the scanner
* @throws Exception when the operation fails
*/
public void stop() throws Exception;
/**
* Subscribes to new scans
*
* @param scanCallback is the handler for scanned data
*/
public void subscribe(FutureCallback<ScannedData> scanCallback);
}
/**
* Sends a scan to all subscribers
* @param scannedData contains the scanned data
* An interface for submitting scanned data (scanner side)
*/
public void newScan(ScannedData scannedData);
public interface Producer {
/**
* Notifies subscribers about an error that occurred during scan
* @param errorMsg is the error that occurred
*/
public void reportScanError(ErrorMsg errorMsg);
/**
* Sends a scan to all subscribers
* @param scannedData contains the scanned data
* @return a BoolMsg containing TRUE iff the scanned data has been sent to at least one subscriber
*/
public BoolMsg newScan(ScannedData scannedData);
/**
* Notifies subscribers about an error that occurred during scan
* @param errorMsg is the error that occurred
* @return a BoolMsg containing TRUE iff the error has been sent to at least one subscriber
*/
public BoolMsg reportScanError(ErrorMsg errorMsg);
}
}

View File

@ -7,14 +7,6 @@ option java_package = "meerkat.protobuf";
import 'meerkat/crypto.proto';
import 'google/protobuf/timestamp.proto';
message BoolMsg {
bool value = 1;
}
message IntMsg {
int32 value = 1;
}
message MessageID {
// The ID of a message for unique retrieval.
// Note that it is assumed that this ID is a function of the message itself.

View File

@ -12,4 +12,11 @@ message ScannedData {
// Container for error messages
message ErrorMsg {
string msg = 1;
}
// Container for HTTP address
message HTTPAddress {
string hostname = 1;
int32 port = 2;
string address = 3;
}

View File

@ -11,3 +11,11 @@ message BroadcastMessage {
bytes payload = 5;
}
message BoolMsg {
bool value = 1;
}
message IntMsg {
int32 value = 1;
}

View File

@ -2,10 +2,8 @@
plugins {
id "us.kirchmeier.capsule" version "1.0.1"
id 'com.google.protobuf' version '0.7.0'
id 'org.akhikhl.gretty' version "1.2.4"
}
apply plugin: 'org.akhikhl.gretty'
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
@ -46,10 +44,11 @@ dependencies {
compile project(':restful-api-common')
// Jersey for RESTful API
compile 'org.glassfish.jersey.containers:jersey-container-servlet:2.22.+'
compile 'org.glassfish.jersey.containers:jersey-container-servlet:2.5.+'
// Servlets
compile 'javax.servlet:javax.servlet-api:3.0.+'
compile 'org.eclipse.jetty:jetty-server:9.3.+'
compile 'org.eclipse.jetty:jetty-servlet:9.3.+'
// Logging
compile 'org.slf4j:slf4j-api:1.7.7'

View File

@ -0,0 +1,89 @@
package meerkat.pollingstation;
/**
* Created by Arbel on 5/31/2016.
*/
import com.google.common.util.concurrent.FutureCallback;
import meerkat.protobuf.Comm;
import meerkat.protobuf.PollingStation;
import javax.annotation.PostConstruct;
import javax.servlet.ServletContext;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import java.io.IOException;
import java.util.Iterator;
import static meerkat.pollingstation.PollingStationConstants.POLLING_STATION_WEB_SCANNER_ERROR_PATH;
import static meerkat.pollingstation.PollingStationConstants.POLLING_STATION_WEB_SCANNER_SCAN_PATH;
import static meerkat.rest.Constants.MEDIATYPE_PROTOBUF;
/**
* Implements a Web-App interface for {@link meerkat.pollingstation.PollingStationScanner.Producer}
* This class depends on {@link meerkat.pollingstation.PollingStationWebScanner} and works in conjunction with it
*/
@Path("/")
public class PollingStationScannerWebApp implements PollingStationScanner.Producer {
@Context
ServletContext servletContext;
Iterator<FutureCallback<PollingStation.ScannedData>> callbacks;
/**
* This method is called by the Jetty engine when instantiating the servlet
*/
@PostConstruct
public void init() throws Exception{
callbacks = ((PollingStationWebScanner.CallbackAccessor) servletContext.getAttribute(PollingStationWebScanner.CALLBACKS_ATTRIBUTE_NAME)).getCallbackIterator();
}
@POST
@Path(POLLING_STATION_WEB_SCANNER_SCAN_PATH)
@Consumes(MEDIATYPE_PROTOBUF)
@Produces(MEDIATYPE_PROTOBUF)
@Override
public Comm.BoolMsg newScan(PollingStation.ScannedData scannedData) {
boolean handled = false;
while (callbacks.hasNext()){
callbacks.next().onSuccess(scannedData);
handled = true;
}
return Comm.BoolMsg.newBuilder()
.setValue(handled)
.build();
}
@POST
@Path(POLLING_STATION_WEB_SCANNER_ERROR_PATH)
@Consumes(MEDIATYPE_PROTOBUF)
@Produces(MEDIATYPE_PROTOBUF)
@Override
public Comm.BoolMsg reportScanError(PollingStation.ErrorMsg errorMsg) {
boolean handled = false;
while (callbacks.hasNext()){
callbacks.next().onFailure(new IOException(errorMsg.getMsg()));
handled = true;
}
return Comm.BoolMsg.newBuilder()
.setValue(handled)
.build();
}
}

View File

@ -1,27 +1,57 @@
package meerkat.pollingstation;
import com.google.common.util.concurrent.FutureCallback;
import meerkat.protobuf.PollingStation;
import static meerkat.rest.Constants.*;
import javax.ws.rs.*;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Iterator;
import java.util.List;
import java.util.LinkedList;
import com.google.common.util.concurrent.FutureCallback;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.*;
import static meerkat.pollingstation.PollingStationConstants.*;
import meerkat.protobuf.PollingStation.*;
import org.glassfish.jersey.servlet.ServletContainer;
import org.glassfish.jersey.server.ResourceConfig;
import meerkat.protobuf.PollingStation.ScannedData;
import meerkat.rest.*;
/**
* Created by Arbel on 05/05/2016.
*/
@Path(POLLING_STATION_WEB_SCANNER_PATH)
public class PollingStationWebScanner implements PollingStationScanner{
public class PollingStationWebScanner implements PollingStationScanner.Consumer{
public final static String CALLBACKS_ATTRIBUTE_NAME = "callbacks";
private final Server server;
private final List<FutureCallback<ScannedData>> callbacks;
public PollingStationWebScanner() {
public PollingStationWebScanner(HTTPAddress address) {
callbacks = new LinkedList<>();
server = new Server(address.getPort());
ServletContextHandler servletContextHandler = new ServletContextHandler(server, address.getAddress());
servletContextHandler.setAttribute(CALLBACKS_ATTRIBUTE_NAME, new CallbackAccessor());
ResourceConfig resourceConfig = new ResourceConfig(PollingStationScannerWebApp.class);
resourceConfig.register(ProtobufMessageBodyReader.class);
resourceConfig.register(ProtobufMessageBodyWriter.class);
ServletHolder servletHolder = new ServletHolder(new ServletContainer(resourceConfig));
servletContextHandler.addServlet(servletHolder, "/*");
}
@Override
public void start() throws Exception {
server.start();
}
@Override
public void stop() throws Exception {
server.stop();
}
@Override
@ -29,43 +59,10 @@ public class PollingStationWebScanner implements PollingStationScanner{
callbacks.add(scanCallback);
}
@Path(POLLING_STATION_WEB_SCANNER_SCAN_PATH)
@POST
@Consumes(MEDIATYPE_PROTOBUF)
@Produces(MEDIATYPE_PROTOBUF)
@Override
public void newScan(ScannedData scannedData) {
if (callbacks.size() <= 0)
throw new RuntimeException("No subscribers to forward scan to!");
for (FutureCallback<ScannedData> callback : callbacks){
callback.onSuccess(scannedData);
public class CallbackAccessor {
public Iterator<FutureCallback<ScannedData>> getCallbackIterator() {
return callbacks.iterator();
}
}
@Path(POLLING_STATION_WEB_SCANNER_ERROR_PATH)
@POST
@Consumes(MEDIATYPE_PROTOBUF)
@Produces(MEDIATYPE_PROTOBUF)
@Override
public void reportScanError(PollingStation.ErrorMsg errorMsg) {
if (callbacks.size() <= 0)
throw new RuntimeException("No subscribers to forward error to!");
for (FutureCallback<ScannedData> callback : callbacks){
callback.onFailure(new IOException(errorMsg.getMsg()));
}
}
@Path("/test")
@GET
public String test(){
return "test";
}
}

View File

@ -1,12 +0,0 @@
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure.dtd">
<Configure class="org.eclipse.jetty.webapp.WebAppContext">
<Call name="setAttribute">
<Arg>org.eclipse.jetty.server.webapp.ContainerIncludeJarPattern</Arg>
<Arg>none</Arg>
</Call>
<Call name="setAttribute">
<Arg>org.eclipse.jetty.server.webapp.WebInfIncludeJarPattern</Arg>
<Arg>none</Arg>
</Call>
</Configure>

View File

@ -1,20 +0,0 @@
<web-app>
<servlet>
<servlet-name>Jersey Hello World</servlet-name>
<servlet-class>
org.glassfish.jersey.servlet.ServletContainer
</servlet-class>
<init-param>
<param-name>jersey.config.server.provider.packages</param-name>
<param-value>meerkat</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>Jersey Hello World</servlet-name>
<url-pattern>/*</url-pattern>
</servlet-mapping>
<listener>
<listener-class>meerkat.pollingstation.PollingStationWebScanner</listener-class>
</listener>
</web-app>

View File

@ -0,0 +1,169 @@
package meerkat.pollingstation;
import com.google.common.util.concurrent.FutureCallback;
import com.google.protobuf.ByteString;
import com.sun.org.apache.regexp.internal.RE;
import meerkat.protobuf.PollingStation;
import meerkat.protobuf.PollingStation.*;
import meerkat.rest.Constants;
import meerkat.rest.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import static meerkat.pollingstation.PollingStationConstants.*;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
/**
* Created by Arbel on 25/05/2016.
*/
public class PollingStationWebScannerTest {
private PollingStationScanner.Consumer scanner;
private HTTPAddress httpAddress;
private Semaphore semaphore;
private Throwable thrown;
private boolean dataIsAsExpected;
private class ScanHandler implements FutureCallback<ScannedData> {
private final ScannedData expectedData;
public ScanHandler(ScannedData expectedData) {
this.expectedData = expectedData;
}
@Override
public void onSuccess(ScannedData result) {
dataIsAsExpected = result.getData().equals(expectedData.getData());
semaphore.release();
}
@Override
public void onFailure(Throwable t) {
dataIsAsExpected = false;
thrown = t;
semaphore.release();
}
}
private class ErrorHandler implements FutureCallback<ScannedData> {
private final String expectedErrorMessage;
public ErrorHandler(String expectedErrorMessage) {
this.expectedErrorMessage = expectedErrorMessage;
}
@Override
public void onSuccess(ScannedData result) {
dataIsAsExpected = false;
semaphore.release();
}
@Override
public void onFailure(Throwable t) {
dataIsAsExpected = t.getMessage().equals(expectedErrorMessage);
semaphore.release();
}
}
@Before
public void init() {
System.err.println("Setting up Scanner WebApp!");
httpAddress = HTTPAddress.newBuilder()
.setPort(8080)
.setHostname("http://localhost")
.build();
scanner = new PollingStationWebScanner(httpAddress);
semaphore = new Semaphore(0);
thrown = null;
try {
scanner.start();
} catch (Exception e) {
assertThat("Could not start server: " + e.getMessage(), false);
}
}
@Test
public void testSuccessfulScan() throws InterruptedException {
Client client = ClientBuilder.newClient();
client.register(ProtobufMessageBodyReader.class);
client.register(ProtobufMessageBodyWriter.class);
WebTarget webTarget = client.target(httpAddress.getHostname() + ":" + httpAddress.getPort())
.path(httpAddress.getAddress()).path(POLLING_STATION_WEB_SCANNER_SCAN_PATH);
byte[] data = {(byte) 1, (byte) 2};
ScannedData scannedData = ScannedData.newBuilder()
.setData(ByteString.copyFrom(data))
.build();
scanner.subscribe(new ScanHandler(scannedData));
Response response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(scannedData, Constants.MEDIATYPE_PROTOBUF));
response.close();
semaphore.acquire();
assertThat("Scanner has thrown an error", thrown == null);
assertThat("Scanned data received was incorrect", dataIsAsExpected);
}
@Test
public void testErroneousScan() throws InterruptedException {
Client client = ClientBuilder.newClient();
client.register(ProtobufMessageBodyReader.class);
client.register(ProtobufMessageBodyWriter.class);
WebTarget webTarget = client.target(httpAddress.getHostname() + ":" + httpAddress.getPort())
.path(httpAddress.getAddress()).path(POLLING_STATION_WEB_SCANNER_ERROR_PATH);
ErrorMsg errorMsg = ErrorMsg.newBuilder()
.setMsg("!Error Message!")
.build();
scanner.subscribe(new ErrorHandler(errorMsg.getMsg()));
Response response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(errorMsg, Constants.MEDIATYPE_PROTOBUF));
response.close();
semaphore.acquire();
assertThat("Scanner error received was incorrect", dataIsAsExpected);
}
@After
public void close() {
System.err.println("Scanner WebApp shutting down...");
try {
scanner.stop();
} catch (Exception e) {
assertThat("Could not stop server: " + e.getMessage(), false);
}
}
}