Merge branch 'Cached-Client' of https://cs.idc.ac.il/rhodecode/meerkat/meerkat-java into Cached-Client

Cached-Client
arbel.peled 2016-04-13 10:38:43 +03:00
commit 48b2b9efa2
17 changed files with 473 additions and 123 deletions

View File

@ -1,168 +1,338 @@
package meerkat.bulletinboard; package meerkat.bulletinboard;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import meerkat.comm.CommunicationException; import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI;
import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Voting.*; import meerkat.protobuf.Voting.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.Executors;
/** /**
* Created by Arbel Deutsch Peled on 03-Mar-16. * Created by Arbel Deutsch Peled on 03-Mar-16.
* This is a full-fledged implementation of a Bulletin Board Client * This is a full-fledged implementation of a Bulletin Board Client
* It provides asynchronous access to several remote servers, as well as a local cache * It provides asynchronous access to several remote servers, as well as a local cache
* Read/write operations are performed on the local server * Read operations are performed on the local server
* Batch reads are performed on the local server and, if they fail, also on the remote servers
* Write operations are performed first on the local server and then on the remotes
* After any read is carried out, a subscription is made for the specific query to make sure the local DB will be updated * After any read is carried out, a subscription is made for the specific query to make sure the local DB will be updated
* The database also employs a synchronizer which makes sure local data is sent to the remote servers * The database also employs a synchronizer which makes sure local data is sent to the remote servers
*/ */
public class CachedBulletinBoardClient implements SubscriptionAsyncBulletinBoardClient { public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClient {
private final BulletinBoardClient localClient; private final AsyncBulletinBoardClient localClient;
private AsyncBulletinBoardClient remoteClient; private AsyncBulletinBoardClient remoteClient;
private BulletinBoardSubscriber subscriber; private BulletinBoardSubscriber subscriber;
private final int threadPoolSize; private class SubscriptionStoreCallback implements FutureCallback<List<BulletinBoardMessage>> {
private final long failDelayInMilliseconds;
private final long subscriptionIntervalInMilliseconds;
public CachedBulletinBoardClient(BulletinBoardClient localClient, private final FutureCallback<?> callback;
int threadPoolSize,
long failDelayInMilliseconds, public SubscriptionStoreCallback(){
long subscriptionIntervalInMilliseconds) callback = null;
throws IllegalAccessException, InstantiationException { }
public SubscriptionStoreCallback(FutureCallback<?> callback){
this.callback = callback;
}
@Override
public void onSuccess(List<BulletinBoardMessage> result) {
for (BulletinBoardMessage msg : result) {
try {
localClient.postMessage(msg);
} catch (CommunicationException ignored) {
// TODO: log
}
}
}
@Override
public void onFailure(Throwable t) {
if (callback != null) {
callback.onFailure(t); // This is some hard error that cannot be dealt with
}
}
}
/**
* Creates a Cached Client
* Assumes all parameters are initialized
* @param localClient is a Client for the local instance
* @param remoteClient is a Client for the remote instance(s); Should have endless retries for post operations
* @param subscriber is a subscription service to the remote instance(s)
*/
public CachedBulletinBoardClient(AsyncBulletinBoardClient localClient,
AsyncBulletinBoardClient remoteClient,
BulletinBoardSubscriber subscriber) {
this.localClient = localClient; this.localClient = localClient;
this.threadPoolSize = threadPoolSize; this.remoteClient = remoteClient;
this.failDelayInMilliseconds = failDelayInMilliseconds; this.subscriber = subscriber;
this.subscriptionIntervalInMilliseconds = subscriptionIntervalInMilliseconds;
remoteClient = new ThreadedBulletinBoardClient();
} }
@Override @Override
public MessageID postMessage(BulletinBoardMessage msg, FutureCallback<Boolean> callback) { public MessageID postMessage(final BulletinBoardMessage msg, final FutureCallback<Boolean> callback) {
return null;
}
@Override return localClient.postMessage(msg, new FutureCallback<Boolean>() {
public MessageID postBatch(CompleteBatch completeBatch, FutureCallback<Boolean> callback) { @Override
return null; public void onSuccess(Boolean result) {
} remoteClient.postMessage(msg, callback);
}
@Override @Override
public void beginBatch(BeginBatchMessage beginBatchMessage, FutureCallback<Boolean> callback) { public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
} }
@Override @Override
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, int startPosition, FutureCallback<Boolean> callback) { public MessageID postBatch(final CompleteBatch completeBatch, final FutureCallback<Boolean> callback) {
return localClient.postBatch(completeBatch, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
remoteClient.postBatch(completeBatch, callback);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
} }
@Override @Override
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, FutureCallback<Boolean> callback) { public void beginBatch(final BeginBatchMessage beginBatchMessage, final FutureCallback<Boolean> callback) {
localClient.beginBatch(beginBatchMessage, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
remoteClient.beginBatch(beginBatchMessage, callback);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
} }
@Override @Override
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList, int startPosition, FutureCallback<Boolean> callback) { public void postBatchData(final byte[] signerId, final int batchId, final List<BatchData> batchDataList,
final int startPosition, final FutureCallback<Boolean> callback) {
localClient.postBatchData(signerId, batchId, batchDataList, startPosition, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
remoteClient.postBatchData(signerId, batchId, batchDataList, startPosition, callback);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
} }
@Override @Override
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList, FutureCallback<Boolean> callback) { public void postBatchData(final byte[] signerId, final int batchId, final List<BatchData> batchDataList, final FutureCallback<Boolean> callback) {
localClient.postBatchData(signerId, batchId, batchDataList, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
remoteClient.postBatchData(signerId, batchId, batchDataList, callback);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
} }
@Override @Override
public void closeBatch(CloseBatchMessage closeBatchMessage, FutureCallback<Boolean> callback) { public void postBatchData(final ByteString signerId, final int batchId, final List<BatchData> batchDataList,
final int startPosition, final FutureCallback<Boolean> callback) {
localClient.postBatchData(signerId, batchId, batchDataList, startPosition, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
remoteClient.postBatchData(signerId, batchId, batchDataList, startPosition, callback);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
}
@Override
public void postBatchData(final ByteString signerId, final int batchId, final List<BatchData> batchDataList, final FutureCallback<Boolean> callback) {
localClient.postBatchData(signerId, batchId, batchDataList, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
remoteClient.postBatchData(signerId, batchId, batchDataList, callback);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
}
@Override
public void closeBatch(final CloseBatchMessage closeBatchMessage, final FutureCallback<Boolean> callback) {
localClient.closeBatch(closeBatchMessage, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
remoteClient.closeBatch(closeBatchMessage, callback);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
} }
@Override @Override
public void getRedundancy(MessageID id, FutureCallback<Float> callback) { public void getRedundancy(MessageID id, FutureCallback<Float> callback) {
} remoteClient.getRedundancy(id, callback);
@Override
public void readMessages(MessageFilterList filterList, FutureCallback<List<BulletinBoardMessage>> callback) {
} }
@Override @Override
public void readBatch(BatchSpecificationMessage batchSpecificationMessage, FutureCallback<CompleteBatch> callback) { public void readMessages(MessageFilterList filterList, final FutureCallback<List<BulletinBoardMessage>> callback) {
localClient.readMessages(filterList, callback);
subscriber.subscribe(filterList, new SubscriptionStoreCallback(callback));
}
@Override
public void readBatch(final BatchSpecificationMessage batchSpecificationMessage, final FutureCallback<CompleteBatch> callback) {
localClient.readBatch(batchSpecificationMessage, new FutureCallback<CompleteBatch>() {
@Override
public void onSuccess(CompleteBatch result) {
callback.onSuccess(result); // Read from local client was successful
}
@Override
public void onFailure(Throwable t) {
// Read from local unsuccessful: try to read from remote
remoteClient.readBatch(batchSpecificationMessage, new FutureCallback<CompleteBatch>() {
@Override
public void onSuccess(CompleteBatch result) {
// Read from remote was successful: store in local and return result
localClient.postBatch(result, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {}
@Override
public void onFailure(Throwable t) {}
});
callback.onSuccess(result);
}
@Override
public void onFailure(Throwable t) {
// Read from remote was unsuccessful: report error
callback.onFailure(t);
}
});
}
});
} }
@Override @Override
public void querySync(SyncQuery syncQuery, FutureCallback<SyncQueryResponse> callback) { public void querySync(SyncQuery syncQuery, FutureCallback<SyncQueryResponse> callback) {
localClient.querySync(syncQuery, callback);
} }
@Override @Override
public void init(BulletinBoardClientParams clientParams) { /**
* This is a stub method
remoteClient.init(clientParams); * All resources are assumed to be initialized
*/
ListeningScheduledExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadPoolSize)); public void init(BulletinBoardClientParams clientParams) {}
List<SubscriptionAsyncBulletinBoardClient> subscriberClients = new ArrayList<>(clientParams.getBulletinBoardAddressCount());
for (String address : clientParams.getBulletinBoardAddressList()){
SubscriptionAsyncBulletinBoardClient newClient =
new SingleServerBulletinBoardClient(executorService, failDelayInMilliseconds, subscriptionIntervalInMilliseconds);
newClient.init(clientParams.toBuilder().clearBulletinBoardAddress().addBulletinBoardAddress(address).build());
subscriberClients.add(newClient);
}
subscriber = new ThreadedBulletinBoardSubscriber(subscriberClients, localClient);
}
@Override @Override
public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException { public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException {
return null; localClient.postMessage(msg);
return remoteClient.postMessage(msg);
} }
@Override @Override
public float getRedundancy(MessageID id) { public float getRedundancy(MessageID id) {
return 0; return remoteClient.getRedundancy(id);
} }
@Override @Override
public List<BulletinBoardMessage> readMessages(MessageFilterList filterList) { public List<BulletinBoardMessage> readMessages(MessageFilterList filterList) {
return null; subscriber.subscribe(filterList, new SubscriptionStoreCallback());
return localClient.readMessages(filterList);
} }
@Override @Override
public SyncQuery generateSyncQuery(GenerateSyncQueryParams GenerateSyncQueryParams) throws CommunicationException { public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException {
return null; return localClient.generateSyncQuery(generateSyncQueryParams);
} }
@Override @Override
public void close() { public void close() {
localClient.close();
remoteClient.close();
} }
@Override @Override
public void subscribe(MessageFilterList filterList, FutureCallback<List<BulletinBoardMessage>> callback) { public void subscribe(MessageFilterList filterList, FutureCallback<List<BulletinBoardMessage>> callback) {
subscriber.subscribe(filterList, callback);
} }
@Override @Override
public void subscribe(MessageFilterList filterList, long startEntry, FutureCallback<List<BulletinBoardMessage>> callback) { public void subscribe(MessageFilterList filterList, long startEntry, FutureCallback<List<BulletinBoardMessage>> callback) {
subscriber.subscribe(filterList, startEntry, callback);
}
public int syncStatus(){
return 0;
}
public void reSync(){
} }
}
}

View File

@ -22,14 +22,14 @@ import java.util.concurrent.TimeUnit;
/** /**
* Created by Arbel Deutsch Peled on 15-Mar-16. * Created by Arbel Deutsch Peled on 15-Mar-16.
* This client is to be used mainly for testing. * This client wraps a BulletinBoardServer in an asynchronous client.
* It wraps a BulletinBoardServer in an asynchronous client. * It is meant to be used as a local cache handler and for testing purposes.
* This means the access to the server is direct (via method calls) instead of through a TCP connection. * This means the access to the server is direct (via method calls) instead of through a TCP connection.
* The client implements both synchronous and asynchronous method calls, but calls to the server itself are performed synchronously. * The client implements both synchronous and asynchronous method calls, but calls to the server itself are performed synchronously.
*/ */
public class LocalBulletinBoardClient implements SubscriptionAsyncBulletinBoardClient{ public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBoardClient {
private final BulletinBoardServer server; private final DeletableBulletinBoardServer server;
private final ListeningScheduledExecutorService executorService; private final ListeningScheduledExecutorService executorService;
private final BatchDigest digest; private final BatchDigest digest;
private final int subsrciptionDelay; private final int subsrciptionDelay;
@ -40,7 +40,7 @@ public class LocalBulletinBoardClient implements SubscriptionAsyncBulletinBoardC
* @param threadNum is the number of concurrent threads to allocate for the client * @param threadNum is the number of concurrent threads to allocate for the client
* @param subscriptionDelay is the required delay between subscription calls in milliseconds * @param subscriptionDelay is the required delay between subscription calls in milliseconds
*/ */
public LocalBulletinBoardClient(BulletinBoardServer server, int threadNum, int subscriptionDelay) { public LocalBulletinBoardClient(DeletableBulletinBoardServer server, int threadNum, int subscriptionDelay) {
this.server = server; this.server = server;
this.executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadNum)); this.executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadNum));
this.digest = new GenericBatchDigest(new SHA256Digest()); this.digest = new GenericBatchDigest(new SHA256Digest());
@ -517,8 +517,30 @@ public class LocalBulletinBoardClient implements SubscriptionAsyncBulletinBoardC
} }
@Override @Override
public SyncQuery generateSyncQuery(GenerateSyncQueryParams GenerateSyncQueryParams) throws CommunicationException { public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException {
return server.generateSyncQuery(GenerateSyncQueryParams); return server.generateSyncQuery(generateSyncQueryParams);
}
@Override
public void deleteMessage(MessageID msgID, FutureCallback<Boolean> callback) {
try {
callback.onSuccess(server.deleteMessage(msgID).getValue());
} catch (CommunicationException e) {
callback.onFailure(e);
}
}
@Override
public void deleteMessage(long entryNum, FutureCallback<Boolean> callback) {
try {
callback.onSuccess(server.deleteMessage(entryNum).getValue());
} catch (CommunicationException e) {
callback.onFailure(e);
}
} }
@Override @Override

View File

@ -7,12 +7,10 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import meerkat.bulletinboard.workers.singleserver.*; import meerkat.bulletinboard.workers.singleserver.*;
import meerkat.comm.CommunicationException; import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI;
import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Voting.BulletinBoardClientParams; import meerkat.protobuf.Voting.BulletinBoardClientParams;
import meerkat.util.BulletinBoardUtils; import meerkat.util.BulletinBoardUtils;
import javax.ws.rs.NotFoundException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
@ -30,7 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* If the list of servers contains more than one server: the server actually used is the first one * If the list of servers contains more than one server: the server actually used is the first one
* The class further implements a delayed access to the server after a communication error occurs * The class further implements a delayed access to the server after a communication error occurs
*/ */
public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient implements SubscriptionAsyncBulletinBoardClient { public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient implements SubscriptionBulletinBoardClient {
private final int MAX_RETRIES = 11; private final int MAX_RETRIES = 11;

View File

@ -8,7 +8,6 @@ import meerkat.util.BulletinBoardUtils;
import static meerkat.protobuf.BulletinBoardAPI.FilterType.*; import static meerkat.protobuf.BulletinBoardAPI.FilterType.*;
import java.sql.Time;
import java.util.*; import java.util.*;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -19,11 +18,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/ */
public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber { public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber {
protected final Collection<SubscriptionAsyncBulletinBoardClient> clients; protected final Collection<SubscriptionBulletinBoardClient> clients;
protected final BulletinBoardClient localClient; protected final BulletinBoardClient localClient;
protected Iterator<SubscriptionAsyncBulletinBoardClient> clientIterator; protected Iterator<SubscriptionBulletinBoardClient> clientIterator;
protected SubscriptionAsyncBulletinBoardClient currentClient; protected SubscriptionBulletinBoardClient currentClient;
private long lastServerSwitchTime; private long lastServerSwitchTime;
@ -32,7 +31,7 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber
private static final Float[] BREAKPOINTS = {0.5f, 0.75f, 0.9f, 0.95f, 0.99f, 0.999f}; private static final Float[] BREAKPOINTS = {0.5f, 0.75f, 0.9f, 0.95f, 0.99f, 0.999f};
public ThreadedBulletinBoardSubscriber(Collection<SubscriptionAsyncBulletinBoardClient> clients, BulletinBoardClient localClient) { public ThreadedBulletinBoardSubscriber(Collection<SubscriptionBulletinBoardClient> clients, BulletinBoardClient localClient) {
this.clients = clients; this.clients = clients;
this.localClient = localClient; this.localClient = localClient;

View File

@ -17,7 +17,6 @@ import java.util.*;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -38,7 +37,7 @@ public class GenericSubscriptionClientTester {
private static String CERT1_PEM_EXAMPLE = "/certs/enduser-certs/user1.crt"; private static String CERT1_PEM_EXAMPLE = "/certs/enduser-certs/user1.crt";
private static String CERT3_PEM_EXAMPLE = "/certs/enduser-certs/user3.crt"; private static String CERT3_PEM_EXAMPLE = "/certs/enduser-certs/user3.crt";
private SubscriptionAsyncBulletinBoardClient bulletinBoardClient; private SubscriptionBulletinBoardClient bulletinBoardClient;
private Random random; private Random random;
private BulletinBoardMessageGenerator generator; private BulletinBoardMessageGenerator generator;
@ -46,7 +45,7 @@ public class GenericSubscriptionClientTester {
private Semaphore jobSemaphore; private Semaphore jobSemaphore;
private Vector<Throwable> thrown; private Vector<Throwable> thrown;
public GenericSubscriptionClientTester(SubscriptionAsyncBulletinBoardClient bulletinBoardClient){ public GenericSubscriptionClientTester(SubscriptionBulletinBoardClient bulletinBoardClient){
this.bulletinBoardClient = bulletinBoardClient; this.bulletinBoardClient = bulletinBoardClient;

View File

@ -48,7 +48,7 @@ public class LocalBulletinBoardClientTest {
throw new CommunicationException(e.getCause() + " " + e.getMessage()); throw new CommunicationException(e.getCause() + " " + e.getMessage());
} }
BulletinBoardServer server = new BulletinBoardSQLServer(queryProvider); DeletableBulletinBoardServer server = new BulletinBoardSQLServer(queryProvider);
server.init(DB_NAME); server.init(DB_NAME);
LocalBulletinBoardClient client = new LocalBulletinBoardClient(server, THREAD_NUM, SUBSRCIPTION_DELAY); LocalBulletinBoardClient client = new LocalBulletinBoardClient(server, THREAD_NUM, SUBSRCIPTION_DELAY);

View File

@ -51,6 +51,7 @@ dependencies {
compile 'org.xerial:sqlite-jdbc:3.8.+' compile 'org.xerial:sqlite-jdbc:3.8.+'
compile 'mysql:mysql-connector-java:5.1.+' compile 'mysql:mysql-connector-java:5.1.+'
compile 'com.h2database:h2:1.0.+' compile 'com.h2database:h2:1.0.+'
compile 'org.apache.commons:commons-dbcp2:2.0.+'
// Servlets // Servlets
compile 'javax.servlet:javax.servlet-api:3.0.+' compile 'javax.servlet:javax.servlet-api:3.0.+'

View File

@ -6,6 +6,7 @@ import java.util.*;
import com.google.protobuf.*; import com.google.protobuf.*;
import com.google.protobuf.Timestamp; import com.google.protobuf.Timestamp;
import com.sun.org.apache.xpath.internal.operations.Bool;
import meerkat.bulletinboard.*; import meerkat.bulletinboard.*;
import meerkat.bulletinboard.sqlserver.mappers.*; import meerkat.bulletinboard.sqlserver.mappers.*;
import static meerkat.bulletinboard.BulletinBoardConstants.*; import static meerkat.bulletinboard.BulletinBoardConstants.*;
@ -16,6 +17,7 @@ import meerkat.comm.MessageOutputStream;
import meerkat.crypto.concrete.ECDSASignature; import meerkat.crypto.concrete.ECDSASignature;
import meerkat.crypto.concrete.SHA256Digest; import meerkat.crypto.concrete.SHA256Digest;
import meerkat.protobuf.BulletinBoardAPI;
import meerkat.protobuf.BulletinBoardAPI.*; import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Crypto.Signature; import meerkat.protobuf.Crypto.Signature;
import meerkat.protobuf.Crypto.SignatureVerificationKey; import meerkat.protobuf.Crypto.SignatureVerificationKey;
@ -38,7 +40,7 @@ import org.springframework.jdbc.support.KeyHolder;
/** /**
* This is a generic SQL implementation of the BulletinBoardServer API. * This is a generic SQL implementation of the BulletinBoardServer API.
*/ */
public class BulletinBoardSQLServer implements BulletinBoardServer{ public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{
/** /**
* This interface provides the required implementation-specific data to enable an access to an actual SQL server. * This interface provides the required implementation-specific data to enable an access to an actual SQL server.
@ -67,6 +69,16 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{
new int[] {Types.BLOB, Types.TIMESTAMP, Types.BLOB} new int[] {Types.BLOB, Types.TIMESTAMP, Types.BLOB}
), ),
DELETE_MSG_BY_ENTRY(
new String[] {"EntryNum"},
new int[] {Types.INTEGER}
),
DELETE_MSG_BY_ID(
new String[] {"MsgId"},
new int[] {Types.BLOB}
),
INSERT_NEW_TAG( INSERT_NEW_TAG(
new String[] {"Tag"}, new String[] {"Tag"},
new int[] {Types.VARCHAR} new int[] {Types.VARCHAR}
@ -529,6 +541,38 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{
return postMessage(msg, true); // Perform a post and check the signature for authenticity return postMessage(msg, true); // Perform a post and check the signature for authenticity
} }
@Override
public BoolMsg deleteMessage(MessageID msgID) throws CommunicationException {
String sql = sqlQueryProvider.getSQLString(QueryType.DELETE_MSG_BY_ID);
Map namedParameters = new HashMap();
namedParameters.put(QueryType.DELETE_MSG_BY_ID.getParamName(0),msgID);
int affectedRows = jdbcTemplate.update(sql, namedParameters);
//TODO: Log
return BoolMsg.newBuilder().setValue(affectedRows > 0).build();
}
@Override
public BoolMsg deleteMessage(long entryNum) throws CommunicationException {
String sql = sqlQueryProvider.getSQLString(QueryType.DELETE_MSG_BY_ENTRY);
Map namedParameters = new HashMap();
namedParameters.put(QueryType.DELETE_MSG_BY_ENTRY.getParamName(0),entryNum);
int affectedRows = jdbcTemplate.update(sql, namedParameters);
//TODO: Log
return BoolMsg.newBuilder().setValue(affectedRows > 0).build();
}
/** /**
* This is a container class for and SQL string builder and a MapSqlParameterSource to be used with it * This is a container class for and SQL string builder and a MapSqlParameterSource to be used with it

View File

@ -1,6 +1,7 @@
package meerkat.bulletinboard.sqlserver; package meerkat.bulletinboard.sqlserver;
import meerkat.protobuf.BulletinBoardAPI.FilterType; import meerkat.protobuf.BulletinBoardAPI.FilterType;
import org.apache.commons.dbcp2.BasicDataSource;
import org.h2.jdbcx.JdbcDataSource; import org.h2.jdbcx.JdbcDataSource;
import javax.naming.Context; import javax.naming.Context;
import javax.naming.InitialContext; import javax.naming.InitialContext;
@ -29,19 +30,28 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider
switch(queryType) { switch(queryType) {
case ADD_SIGNATURE: case ADD_SIGNATURE:
return "INSERT INTO SignatureTable (EntryNum, SignerId, Signature)" return MessageFormat.format(
+ " SELECT DISTINCT :EntryNum AS Entry, :SignerId AS Id, :Signature AS Sig FROM UtilityTable AS Temp" "INSERT INTO SignatureTable (EntryNum, SignerId, Signature)"
+ " WHERE NOT EXISTS" + " SELECT DISTINCT :{0} AS Entry, :{1} AS Id, :{2} AS Sig FROM UtilityTable AS Temp"
+ " (SELECT 1 FROM SignatureTable AS SubTable WHERE SubTable.SignerId = :SignerId AND SubTable.EntryNum = :EntryNum)"; + " WHERE NOT EXISTS"
+ " (SELECT 1 FROM SignatureTable AS SubTable WHERE SubTable.EntryNum = :{0} AND SubTable.SignerId = :{1})",
QueryType.ADD_SIGNATURE.getParamName(0),
QueryType.ADD_SIGNATURE.getParamName(1),
QueryType.ADD_SIGNATURE.getParamName(2));
case CONNECT_TAG: case CONNECT_TAG:
return "INSERT INTO MsgTagTable (TagId, EntryNum)" return MessageFormat.format(
+ " SELECT DISTINCT TagTable.TagId, :EntryNum AS NewEntry FROM TagTable WHERE Tag = :Tag" "INSERT INTO MsgTagTable (TagId, EntryNum)"
+ " AND NOT EXISTS (SELECT 1 FROM MsgTagTable AS SubTable WHERE SubTable.TagId = TagTable.TagId" + " SELECT DISTINCT TagTable.TagId, :{0} AS NewEntry FROM TagTable WHERE Tag = :{1}"
+ " AND SubTable.EntryNum = :EntryNum)"; + " AND NOT EXISTS (SELECT 1 FROM MsgTagTable AS SubTable WHERE SubTable.TagId = TagTable.TagId"
+ " AND SubTable.EntryNum = :{0})",
QueryType.CONNECT_TAG.getParamName(0),
QueryType.CONNECT_TAG.getParamName(1));
case FIND_MSG_ID: case FIND_MSG_ID:
return "SELECT EntryNum From MsgTable WHERE MsgId = :MsgId"; return MessageFormat.format(
"SELECT EntryNum From MsgTable WHERE MsgId = :{0}",
QueryType.FIND_MSG_ID.getParamName(0));
case FIND_TAG_ID: case FIND_TAG_ID:
return MessageFormat.format( return MessageFormat.format(
@ -58,14 +68,32 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider
return "SELECT MsgTable.EntryNum, MsgTable.MsgId, MsgTable.ExactTime FROM MsgTable"; return "SELECT MsgTable.EntryNum, MsgTable.MsgId, MsgTable.ExactTime FROM MsgTable";
case GET_SIGNATURES: case GET_SIGNATURES:
return "SELECT Signature FROM SignatureTable WHERE EntryNum = :EntryNum"; return MessageFormat.format(
"SELECT Signature FROM SignatureTable WHERE EntryNum = :{0}",
QueryType.GET_SIGNATURES.getParamName(0));
case INSERT_MSG: case INSERT_MSG:
return "INSERT INTO MsgTable (MsgId, Msg) VALUES(:MsgId,:Msg)"; return MessageFormat.format(
"INSERT INTO MsgTable (MsgId, ExactTime, Msg) VALUES(:{0}, :{1}, :{2})",
QueryType.INSERT_MSG.getParamName(0),
QueryType.INSERT_MSG.getParamName(1),
QueryType.INSERT_MSG.getParamName(2));
case DELETE_MSG_BY_ENTRY:
return MessageFormat.format(
"DELETE FROM MsgTable WHERE EntryNum = :{0}",
QueryType.DELETE_MSG_BY_ENTRY.getParamName(0));
case DELETE_MSG_BY_ID:
return MessageFormat.format(
"DELETE FROM MsgTable WHERE MsgId = :{0}",
QueryType.DELETE_MSG_BY_ID.getParamName(0));
case INSERT_NEW_TAG: case INSERT_NEW_TAG:
return "INSERT INTO TagTable(Tag) SELECT DISTINCT :Tag AS NewTag FROM UtilityTable WHERE" return MessageFormat.format(
+ " NOT EXISTS (SELECT 1 FROM TagTable AS SubTable WHERE SubTable.Tag = :Tag)"; "INSERT INTO TagTable(Tag) SELECT DISTINCT :Tag AS NewTag FROM UtilityTable WHERE"
+ " NOT EXISTS (SELECT 1 FROM TagTable AS SubTable WHERE SubTable.Tag = :{0})",
QueryType.INSERT_NEW_TAG.getParamName(0));
case GET_LAST_MESSAGE_ENTRY: case GET_LAST_MESSAGE_ENTRY:
return "SELECT MAX(MsgTable.EntryNum) FROM MsgTable"; return "SELECT MAX(MsgTable.EntryNum) FROM MsgTable";
@ -200,10 +228,13 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider
@Override @Override
public DataSource getDataSource() { public DataSource getDataSource() {
JdbcDataSource dataSource = new JdbcDataSource(); BasicDataSource dataSource = new BasicDataSource();
dataSource.setURL("jdbc:h2:~/" + dbName);
dataSource.setDriverClassName("org.h2.Driver");
dataSource.setUrl("jdbc:h2:~/" + dbName);
return dataSource; return dataSource;
} }

View File

@ -1,9 +1,8 @@
package meerkat.bulletinboard.sqlserver; package meerkat.bulletinboard.sqlserver;
import com.mysql.jdbc.jdbc2.optional.MysqlDataSource;
import meerkat.bulletinboard.BulletinBoardConstants;
import meerkat.bulletinboard.sqlserver.BulletinBoardSQLServer.SQLQueryProvider; import meerkat.bulletinboard.sqlserver.BulletinBoardSQLServer.SQLQueryProvider;
import meerkat.protobuf.BulletinBoardAPI.FilterType; import meerkat.protobuf.BulletinBoardAPI.FilterType;
import org.apache.commons.dbcp2.BasicDataSource;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.text.MessageFormat; import java.text.MessageFormat;
@ -80,6 +79,16 @@ public class MySQLQueryProvider implements SQLQueryProvider {
QueryType.INSERT_MSG.getParamName(1), QueryType.INSERT_MSG.getParamName(1),
QueryType.INSERT_MSG.getParamName(2)); QueryType.INSERT_MSG.getParamName(2));
case DELETE_MSG_BY_ENTRY:
return MessageFormat.format(
"DELETE IGNORE FROM MsgTable WHERE EntryNum = :{0}",
QueryType.DELETE_MSG_BY_ENTRY.getParamName(0));
case DELETE_MSG_BY_ID:
return MessageFormat.format(
"DELETE IGNORE FROM MsgTable WHERE MsgId = :{0}",
QueryType.DELETE_MSG_BY_ID.getParamName(0));
case INSERT_NEW_TAG: case INSERT_NEW_TAG:
return MessageFormat.format( return MessageFormat.format(
"INSERT IGNORE INTO TagTable(Tag) VALUES (:{0})", "INSERT IGNORE INTO TagTable(Tag) VALUES (:{0})",
@ -216,16 +225,17 @@ public class MySQLQueryProvider implements SQLQueryProvider {
@Override @Override
public DataSource getDataSource() { public DataSource getDataSource() {
MysqlDataSource dataSource = new MysqlDataSource();
dataSource.setServerName(dbAddress); BasicDataSource dataSource = new BasicDataSource();
dataSource.setPort(dbPort);
dataSource.setDatabaseName(dbName); dataSource.setDriverClassName("com.mysql.jdbc.Driver");
dataSource.setUser(username); dataSource.setUrl("jdbc:mysql://" + dbAddress + ":" + dbPort + "/" + dbName);
dataSource.setUsername(username);
dataSource.setPassword(password); dataSource.setPassword(password);
dataSource.setAllowMultiQueries(true);
return dataSource; return dataSource;
} }
@Override @Override

View File

@ -140,6 +140,16 @@ public class H2BulletinBoardServerTest {
} }
@Test
public void testSyncQuery() {
try {
serverTest.testSyncQuery();
} catch (Exception e) {
System.err.println(e.getMessage());
fail(e.getMessage());
}
}
@After @After
public void close() { public void close() {
System.err.println("Starting to close H2BulletinBoardServerTest"); System.err.println("Starting to close H2BulletinBoardServerTest");

View File

@ -47,12 +47,12 @@ public interface BulletinBoardClient {
/** /**
* Create a SyncQuery to test against that corresponds with the current server state for a specific filter list * Create a SyncQuery to test against that corresponds with the current server state for a specific filter list
* Should only be called on instances for which the actual server contacted is known (i.e. there is only one server) * Should only be called on instances for which the actual server contacted is known (i.e. there is only one server)
* @param GenerateSyncQueryParams defines the required information needed to generate the query * @param generateSyncQueryParams defines the required information needed to generate the query
* These are represented as fractions of the total number of relevant messages * These are represented as fractions of the total number of relevant messages
* @return The generated SyncQuery * @return The generated SyncQuery
* @throws CommunicationException when no DB can be contacted * @throws CommunicationException when no DB can be contacted
*/ */
SyncQuery generateSyncQuery(GenerateSyncQueryParams GenerateSyncQueryParams) throws CommunicationException; SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException;
/** /**
* Closes all connections, if any. * Closes all connections, if any.

View File

@ -0,0 +1,30 @@
package meerkat.bulletinboard;
import com.google.common.util.concurrent.FutureCallback;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI.*;
/**
* Created by Arbel Deutsch Peled on 13-Apr-16.
* This interface is meant to extend a BulletinBoardClient interface/class
* It provides it with the ability to delete messages from the Server
* This assumes the Server implements the {@link DeletableBulletinBoardServer}
*/
public interface BulletinBoardMessageDeleter {
/**
* Deletes a message from a Bulletin Board Server
* @param msgID is the ID of the message to delete
* @param callback handles the result of the operation
*/
public void deleteMessage(MessageID msgID, FutureCallback<Boolean> callback);
/**
* Deletes a message from the Bulletin Board
* Logs this action
* @param entryNum is the serial entry number of the message to delete
* @param callback handles the result of the operation
*/
public void deleteMessage(long entryNum, FutureCallback<Boolean> callback);
}

View File

@ -0,0 +1,29 @@
package meerkat.bulletinboard;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI.*;
/**
* Created by Arbel Deutsch Peled on 13-Apr-16.
*/
public interface DeletableBulletinBoardServer extends BulletinBoardServer {
/**
* Deletes a message from the Bulletin Board
* Logs this action
* @param msgID is the ID of the message to delete
* @return a BoolMsg containing the value TRUE if a message was deleted, FALSE if the message does not exist
* @throws CommunicationException in case of an error
*/
public BoolMsg deleteMessage(MessageID msgID) throws CommunicationException;
/**
* Deletes a message from the Bulletin Board
* Logs this action
* @param entryNum is the serial entry number of the message to delete
* @return a BoolMsg containing the value TRUE if a message was deleted, FALSE if the message does not exist
* @throws CommunicationException in case of an error
*/
public BoolMsg deleteMessage(long entryNum) throws CommunicationException;
}

View File

@ -0,0 +1,7 @@
package meerkat.bulletinboard;
/**
* Created by Arbel Deutsch Peled on 13-Apr-16.
*/
public interface DeletableSubscriptionBulletinBoardClient extends SubscriptionBulletinBoardClient, BulletinBoardMessageDeleter {
}

View File

@ -1,7 +0,0 @@
package meerkat.bulletinboard;
/**
* Created by Arbel Deutsch Peled on 03-Mar-16.
*/
public interface SubscriptionAsyncBulletinBoardClient extends AsyncBulletinBoardClient, BulletinBoardSubscriber {
}

View File

@ -0,0 +1,7 @@
package meerkat.bulletinboard;
/**
* Created by Arbel Deutsch Peled on 03-Mar-16.
*/
public interface SubscriptionBulletinBoardClient extends AsyncBulletinBoardClient, BulletinBoardSubscriber {
}