Added Deletion to Bulletin Board Server and Local Client

Cached-Client
Arbel Deutsch Peled 2016-04-13 09:46:24 +03:00
parent edfd47a98d
commit c806e7b32a
15 changed files with 448 additions and 115 deletions

View File

@ -1,168 +1,338 @@
package meerkat.bulletinboard;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Voting.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
/**
* Created by Arbel Deutsch Peled on 03-Mar-16.
* This is a full-fledged implementation of a Bulletin Board Client
* It provides asynchronous access to several remote servers, as well as a local cache
* Read/write operations are performed on the local server
* Read operations are performed on the local server
* Batch reads are performed on the local server and, if they fail, also on the remote servers
* Write operations are performed first on the local server and then on the remotes
* After any read is carried out, a subscription is made for the specific query to make sure the local DB will be updated
* The database also employs a synchronizer which makes sure local data is sent to the remote servers
*/
public class CachedBulletinBoardClient implements SubscriptionAsyncBulletinBoardClient {
public class CachedBulletinBoardClient implements SubscriptionBulletinBoardClient {
private final BulletinBoardClient localClient;
private final AsyncBulletinBoardClient localClient;
private AsyncBulletinBoardClient remoteClient;
private BulletinBoardSubscriber subscriber;
private final int threadPoolSize;
private final long failDelayInMilliseconds;
private final long subscriptionIntervalInMilliseconds;
private class SubscriptionStoreCallback implements FutureCallback<List<BulletinBoardMessage>> {
public CachedBulletinBoardClient(BulletinBoardClient localClient,
int threadPoolSize,
long failDelayInMilliseconds,
long subscriptionIntervalInMilliseconds)
throws IllegalAccessException, InstantiationException {
private final FutureCallback<?> callback;
public SubscriptionStoreCallback(){
callback = null;
}
public SubscriptionStoreCallback(FutureCallback<?> callback){
this.callback = callback;
}
@Override
public void onSuccess(List<BulletinBoardMessage> result) {
for (BulletinBoardMessage msg : result) {
try {
localClient.postMessage(msg);
} catch (CommunicationException ignored) {
// TODO: log
}
}
}
@Override
public void onFailure(Throwable t) {
if (callback != null) {
callback.onFailure(t); // This is some hard error that cannot be dealt with
}
}
}
/**
* Creates a Cached Client
* Assumes all parameters are initialized
* @param localClient is a Client for the local instance
* @param remoteClient is a Client for the remote instance(s); Should have endless retries for post operations
* @param subscriber is a subscription service to the remote instance(s)
*/
public CachedBulletinBoardClient(AsyncBulletinBoardClient localClient,
AsyncBulletinBoardClient remoteClient,
BulletinBoardSubscriber subscriber) {
this.localClient = localClient;
this.threadPoolSize = threadPoolSize;
this.failDelayInMilliseconds = failDelayInMilliseconds;
this.subscriptionIntervalInMilliseconds = subscriptionIntervalInMilliseconds;
remoteClient = new ThreadedBulletinBoardClient();
this.remoteClient = remoteClient;
this.subscriber = subscriber;
}
@Override
public MessageID postMessage(BulletinBoardMessage msg, FutureCallback<Boolean> callback) {
return null;
}
public MessageID postMessage(final BulletinBoardMessage msg, final FutureCallback<Boolean> callback) {
@Override
public MessageID postBatch(CompleteBatch completeBatch, FutureCallback<Boolean> callback) {
return null;
}
return localClient.postMessage(msg, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
remoteClient.postMessage(msg, callback);
}
@Override
public void beginBatch(BeginBatchMessage beginBatchMessage, FutureCallback<Boolean> callback) {
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
}
@Override
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, int startPosition, FutureCallback<Boolean> callback) {
public MessageID postBatch(final CompleteBatch completeBatch, final FutureCallback<Boolean> callback) {
return localClient.postBatch(completeBatch, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
remoteClient.postBatch(completeBatch, callback);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
}
@Override
public void postBatchData(byte[] signerId, int batchId, List<BatchData> batchDataList, FutureCallback<Boolean> callback) {
public void beginBatch(final BeginBatchMessage beginBatchMessage, final FutureCallback<Boolean> callback) {
localClient.beginBatch(beginBatchMessage, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
remoteClient.beginBatch(beginBatchMessage, callback);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
}
@Override
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList, int startPosition, FutureCallback<Boolean> callback) {
public void postBatchData(final byte[] signerId, final int batchId, final List<BatchData> batchDataList,
final int startPosition, final FutureCallback<Boolean> callback) {
localClient.postBatchData(signerId, batchId, batchDataList, startPosition, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
remoteClient.postBatchData(signerId, batchId, batchDataList, startPosition, callback);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
}
@Override
public void postBatchData(ByteString signerId, int batchId, List<BatchData> batchDataList, FutureCallback<Boolean> callback) {
public void postBatchData(final byte[] signerId, final int batchId, final List<BatchData> batchDataList, final FutureCallback<Boolean> callback) {
localClient.postBatchData(signerId, batchId, batchDataList, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
remoteClient.postBatchData(signerId, batchId, batchDataList, callback);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
}
@Override
public void closeBatch(CloseBatchMessage closeBatchMessage, FutureCallback<Boolean> callback) {
public void postBatchData(final ByteString signerId, final int batchId, final List<BatchData> batchDataList,
final int startPosition, final FutureCallback<Boolean> callback) {
localClient.postBatchData(signerId, batchId, batchDataList, startPosition, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
remoteClient.postBatchData(signerId, batchId, batchDataList, startPosition, callback);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
}
@Override
public void postBatchData(final ByteString signerId, final int batchId, final List<BatchData> batchDataList, final FutureCallback<Boolean> callback) {
localClient.postBatchData(signerId, batchId, batchDataList, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
remoteClient.postBatchData(signerId, batchId, batchDataList, callback);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
}
@Override
public void closeBatch(final CloseBatchMessage closeBatchMessage, final FutureCallback<Boolean> callback) {
localClient.closeBatch(closeBatchMessage, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
remoteClient.closeBatch(closeBatchMessage, callback);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
}
@Override
public void getRedundancy(MessageID id, FutureCallback<Float> callback) {
}
@Override
public void readMessages(MessageFilterList filterList, FutureCallback<List<BulletinBoardMessage>> callback) {
remoteClient.getRedundancy(id, callback);
}
@Override
public void readBatch(BatchSpecificationMessage batchSpecificationMessage, FutureCallback<CompleteBatch> callback) {
public void readMessages(MessageFilterList filterList, final FutureCallback<List<BulletinBoardMessage>> callback) {
localClient.readMessages(filterList, callback);
subscriber.subscribe(filterList, new SubscriptionStoreCallback(callback));
}
@Override
public void readBatch(final BatchSpecificationMessage batchSpecificationMessage, final FutureCallback<CompleteBatch> callback) {
localClient.readBatch(batchSpecificationMessage, new FutureCallback<CompleteBatch>() {
@Override
public void onSuccess(CompleteBatch result) {
callback.onSuccess(result); // Read from local client was successful
}
@Override
public void onFailure(Throwable t) {
// Read from local unsuccessful: try to read from remote
remoteClient.readBatch(batchSpecificationMessage, new FutureCallback<CompleteBatch>() {
@Override
public void onSuccess(CompleteBatch result) {
// Read from remote was successful: store in local and return result
localClient.postBatch(result, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {}
@Override
public void onFailure(Throwable t) {}
});
callback.onSuccess(result);
}
@Override
public void onFailure(Throwable t) {
// Read from remote was unsuccessful: report error
callback.onFailure(t);
}
});
}
});
}
@Override
public void querySync(SyncQuery syncQuery, FutureCallback<SyncQueryResponse> callback) {
localClient.querySync(syncQuery, callback);
}
@Override
public void init(BulletinBoardClientParams clientParams) {
remoteClient.init(clientParams);
ListeningScheduledExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadPoolSize));
List<SubscriptionAsyncBulletinBoardClient> subscriberClients = new ArrayList<>(clientParams.getBulletinBoardAddressCount());
for (String address : clientParams.getBulletinBoardAddressList()){
SubscriptionAsyncBulletinBoardClient newClient =
new SingleServerBulletinBoardClient(executorService, failDelayInMilliseconds, subscriptionIntervalInMilliseconds);
newClient.init(clientParams.toBuilder().clearBulletinBoardAddress().addBulletinBoardAddress(address).build());
subscriberClients.add(newClient);
}
subscriber = new ThreadedBulletinBoardSubscriber(subscriberClients, localClient);
}
/**
* This is a stub method
* All resources are assumed to be initialized
*/
public void init(BulletinBoardClientParams clientParams) {}
@Override
public MessageID postMessage(BulletinBoardMessage msg) throws CommunicationException {
return null;
localClient.postMessage(msg);
return remoteClient.postMessage(msg);
}
@Override
public float getRedundancy(MessageID id) {
return 0;
return remoteClient.getRedundancy(id);
}
@Override
public List<BulletinBoardMessage> readMessages(MessageFilterList filterList) {
return null;
subscriber.subscribe(filterList, new SubscriptionStoreCallback());
return localClient.readMessages(filterList);
}
@Override
public SyncQuery generateSyncQuery(GenerateSyncQueryParams GenerateSyncQueryParams) throws CommunicationException {
return null;
public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException {
return localClient.generateSyncQuery(generateSyncQueryParams);
}
@Override
public void close() {
localClient.close();
remoteClient.close();
}
@Override
public void subscribe(MessageFilterList filterList, FutureCallback<List<BulletinBoardMessage>> callback) {
subscriber.subscribe(filterList, callback);
}
@Override
public void subscribe(MessageFilterList filterList, long startEntry, FutureCallback<List<BulletinBoardMessage>> callback) {
subscriber.subscribe(filterList, startEntry, callback);
}
public int syncStatus(){
return 0;
}
public void reSync(){
}
}
}

View File

@ -22,14 +22,14 @@ import java.util.concurrent.TimeUnit;
/**
* Created by Arbel Deutsch Peled on 15-Mar-16.
* This client is to be used mainly for testing.
* It wraps a BulletinBoardServer in an asynchronous client.
* This client wraps a BulletinBoardServer in an asynchronous client.
* It is meant to be used as a local cache handler and for testing purposes.
* This means the access to the server is direct (via method calls) instead of through a TCP connection.
* The client implements both synchronous and asynchronous method calls, but calls to the server itself are performed synchronously.
*/
public class LocalBulletinBoardClient implements SubscriptionAsyncBulletinBoardClient{
public class LocalBulletinBoardClient implements DeletableSubscriptionBulletinBoardClient {
private final BulletinBoardServer server;
private final DeletableBulletinBoardServer server;
private final ListeningScheduledExecutorService executorService;
private final BatchDigest digest;
private final int subsrciptionDelay;
@ -40,7 +40,7 @@ public class LocalBulletinBoardClient implements SubscriptionAsyncBulletinBoardC
* @param threadNum is the number of concurrent threads to allocate for the client
* @param subscriptionDelay is the required delay between subscription calls in milliseconds
*/
public LocalBulletinBoardClient(BulletinBoardServer server, int threadNum, int subscriptionDelay) {
public LocalBulletinBoardClient(DeletableBulletinBoardServer server, int threadNum, int subscriptionDelay) {
this.server = server;
this.executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadNum));
this.digest = new GenericBatchDigest(new SHA256Digest());
@ -517,8 +517,30 @@ public class LocalBulletinBoardClient implements SubscriptionAsyncBulletinBoardC
}
@Override
public SyncQuery generateSyncQuery(GenerateSyncQueryParams GenerateSyncQueryParams) throws CommunicationException {
return server.generateSyncQuery(GenerateSyncQueryParams);
public SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException {
return server.generateSyncQuery(generateSyncQueryParams);
}
@Override
public void deleteMessage(MessageID msgID, FutureCallback<Boolean> callback) {
try {
callback.onSuccess(server.deleteMessage(msgID).getValue());
} catch (CommunicationException e) {
callback.onFailure(e);
}
}
@Override
public void deleteMessage(long entryNum, FutureCallback<Boolean> callback) {
try {
callback.onSuccess(server.deleteMessage(entryNum).getValue());
} catch (CommunicationException e) {
callback.onFailure(e);
}
}
@Override

View File

@ -7,12 +7,10 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import meerkat.bulletinboard.workers.singleserver.*;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Voting.BulletinBoardClientParams;
import meerkat.util.BulletinBoardUtils;
import javax.ws.rs.NotFoundException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
@ -30,7 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* If the list of servers contains more than one server: the server actually used is the first one
* The class further implements a delayed access to the server after a communication error occurs
*/
public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient implements SubscriptionAsyncBulletinBoardClient {
public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient implements SubscriptionBulletinBoardClient {
private final int MAX_RETRIES = 11;

View File

@ -8,7 +8,6 @@ import meerkat.util.BulletinBoardUtils;
import static meerkat.protobuf.BulletinBoardAPI.FilterType.*;
import java.sql.Time;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
@ -19,11 +18,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber {
protected final Collection<SubscriptionAsyncBulletinBoardClient> clients;
protected final Collection<SubscriptionBulletinBoardClient> clients;
protected final BulletinBoardClient localClient;
protected Iterator<SubscriptionAsyncBulletinBoardClient> clientIterator;
protected SubscriptionAsyncBulletinBoardClient currentClient;
protected Iterator<SubscriptionBulletinBoardClient> clientIterator;
protected SubscriptionBulletinBoardClient currentClient;
private long lastServerSwitchTime;
@ -32,7 +31,7 @@ public class ThreadedBulletinBoardSubscriber implements BulletinBoardSubscriber
private static final Float[] BREAKPOINTS = {0.5f, 0.75f, 0.9f, 0.95f, 0.99f, 0.999f};
public ThreadedBulletinBoardSubscriber(Collection<SubscriptionAsyncBulletinBoardClient> clients, BulletinBoardClient localClient) {
public ThreadedBulletinBoardSubscriber(Collection<SubscriptionBulletinBoardClient> clients, BulletinBoardClient localClient) {
this.clients = clients;
this.localClient = localClient;

View File

@ -17,7 +17,6 @@ import java.util.*;
import java.util.concurrent.Semaphore;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@ -38,7 +37,7 @@ public class GenericSubscriptionClientTester {
private static String CERT1_PEM_EXAMPLE = "/certs/enduser-certs/user1.crt";
private static String CERT3_PEM_EXAMPLE = "/certs/enduser-certs/user3.crt";
private SubscriptionAsyncBulletinBoardClient bulletinBoardClient;
private SubscriptionBulletinBoardClient bulletinBoardClient;
private Random random;
private BulletinBoardMessageGenerator generator;
@ -46,7 +45,7 @@ public class GenericSubscriptionClientTester {
private Semaphore jobSemaphore;
private Vector<Throwable> thrown;
public GenericSubscriptionClientTester(SubscriptionAsyncBulletinBoardClient bulletinBoardClient){
public GenericSubscriptionClientTester(SubscriptionBulletinBoardClient bulletinBoardClient){
this.bulletinBoardClient = bulletinBoardClient;

View File

@ -48,7 +48,7 @@ public class LocalBulletinBoardClientTest {
throw new CommunicationException(e.getCause() + " " + e.getMessage());
}
BulletinBoardServer server = new BulletinBoardSQLServer(queryProvider);
DeletableBulletinBoardServer server = new BulletinBoardSQLServer(queryProvider);
server.init(DB_NAME);
LocalBulletinBoardClient client = new LocalBulletinBoardClient(server, THREAD_NUM, SUBSRCIPTION_DELAY);

View File

@ -6,6 +6,7 @@ import java.util.*;
import com.google.protobuf.*;
import com.google.protobuf.Timestamp;
import com.sun.org.apache.xpath.internal.operations.Bool;
import meerkat.bulletinboard.*;
import meerkat.bulletinboard.sqlserver.mappers.*;
import static meerkat.bulletinboard.BulletinBoardConstants.*;
@ -16,6 +17,7 @@ import meerkat.comm.MessageOutputStream;
import meerkat.crypto.concrete.ECDSASignature;
import meerkat.crypto.concrete.SHA256Digest;
import meerkat.protobuf.BulletinBoardAPI;
import meerkat.protobuf.BulletinBoardAPI.*;
import meerkat.protobuf.Crypto.Signature;
import meerkat.protobuf.Crypto.SignatureVerificationKey;
@ -38,7 +40,7 @@ import org.springframework.jdbc.support.KeyHolder;
/**
* This is a generic SQL implementation of the BulletinBoardServer API.
*/
public class BulletinBoardSQLServer implements BulletinBoardServer{
public class BulletinBoardSQLServer implements DeletableBulletinBoardServer{
/**
* This interface provides the required implementation-specific data to enable an access to an actual SQL server.
@ -67,6 +69,16 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{
new int[] {Types.BLOB, Types.TIMESTAMP, Types.BLOB}
),
DELETE_MSG_BY_ENTRY(
new String[] {"EntryNum"},
new int[] {Types.INTEGER}
),
DELETE_MSG_BY_ID(
new String[] {"MsgId"},
new int[] {Types.BLOB}
),
INSERT_NEW_TAG(
new String[] {"Tag"},
new int[] {Types.VARCHAR}
@ -529,6 +541,38 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{
return postMessage(msg, true); // Perform a post and check the signature for authenticity
}
@Override
public BoolMsg deleteMessage(MessageID msgID) throws CommunicationException {
String sql = sqlQueryProvider.getSQLString(QueryType.DELETE_MSG_BY_ID);
Map namedParameters = new HashMap();
namedParameters.put(QueryType.DELETE_MSG_BY_ID.getParamName(0),msgID);
int affectedRows = jdbcTemplate.update(sql, namedParameters);
//TODO: Log
return BoolMsg.newBuilder().setValue(affectedRows > 0).build();
}
@Override
public BoolMsg deleteMessage(long entryNum) throws CommunicationException {
String sql = sqlQueryProvider.getSQLString(QueryType.DELETE_MSG_BY_ENTRY);
Map namedParameters = new HashMap();
namedParameters.put(QueryType.DELETE_MSG_BY_ENTRY.getParamName(0),entryNum);
int affectedRows = jdbcTemplate.update(sql, namedParameters);
//TODO: Log
return BoolMsg.newBuilder().setValue(affectedRows > 0).build();
}
/**
* This is a container class for and SQL string builder and a MapSqlParameterSource to be used with it

View File

@ -30,19 +30,28 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider
switch(queryType) {
case ADD_SIGNATURE:
return "INSERT INTO SignatureTable (EntryNum, SignerId, Signature)"
+ " SELECT DISTINCT :EntryNum AS Entry, :SignerId AS Id, :Signature AS Sig FROM UtilityTable AS Temp"
+ " WHERE NOT EXISTS"
+ " (SELECT 1 FROM SignatureTable AS SubTable WHERE SubTable.SignerId = :SignerId AND SubTable.EntryNum = :EntryNum)";
return MessageFormat.format(
"INSERT INTO SignatureTable (EntryNum, SignerId, Signature)"
+ " SELECT DISTINCT :{0} AS Entry, :{1} AS Id, :{2} AS Sig FROM UtilityTable AS Temp"
+ " WHERE NOT EXISTS"
+ " (SELECT 1 FROM SignatureTable AS SubTable WHERE SubTable.EntryNum = :{0} AND SubTable.SignerId = :{1})",
QueryType.ADD_SIGNATURE.getParamName(0),
QueryType.ADD_SIGNATURE.getParamName(1),
QueryType.ADD_SIGNATURE.getParamName(2));
case CONNECT_TAG:
return "INSERT INTO MsgTagTable (TagId, EntryNum)"
+ " SELECT DISTINCT TagTable.TagId, :EntryNum AS NewEntry FROM TagTable WHERE Tag = :Tag"
+ " AND NOT EXISTS (SELECT 1 FROM MsgTagTable AS SubTable WHERE SubTable.TagId = TagTable.TagId"
+ " AND SubTable.EntryNum = :EntryNum)";
return MessageFormat.format(
"INSERT INTO MsgTagTable (TagId, EntryNum)"
+ " SELECT DISTINCT TagTable.TagId, :{0} AS NewEntry FROM TagTable WHERE Tag = :{1}"
+ " AND NOT EXISTS (SELECT 1 FROM MsgTagTable AS SubTable WHERE SubTable.TagId = TagTable.TagId"
+ " AND SubTable.EntryNum = :{0})",
QueryType.CONNECT_TAG.getParamName(0),
QueryType.CONNECT_TAG.getParamName(1));
case FIND_MSG_ID:
return "SELECT EntryNum From MsgTable WHERE MsgId = :MsgId";
return MessageFormat.format(
"SELECT EntryNum From MsgTable WHERE MsgId = :{0}",
QueryType.FIND_MSG_ID.getParamName(0));
case FIND_TAG_ID:
return MessageFormat.format(
@ -59,14 +68,32 @@ public class H2QueryProvider implements BulletinBoardSQLServer.SQLQueryProvider
return "SELECT MsgTable.EntryNum, MsgTable.MsgId, MsgTable.ExactTime FROM MsgTable";
case GET_SIGNATURES:
return "SELECT Signature FROM SignatureTable WHERE EntryNum = :EntryNum";
return MessageFormat.format(
"SELECT Signature FROM SignatureTable WHERE EntryNum = :{0}",
QueryType.GET_SIGNATURES.getParamName(0));
case INSERT_MSG:
return "INSERT INTO MsgTable (MsgId, Msg, ExactTime) VALUES(:MsgId,:Msg,:TimeStamp)";
return MessageFormat.format(
"INSERT INTO MsgTable (MsgId, ExactTime, Msg) VALUES(:{0}, :{1}, :{2})",
QueryType.INSERT_MSG.getParamName(0),
QueryType.INSERT_MSG.getParamName(1),
QueryType.INSERT_MSG.getParamName(2));
case DELETE_MSG_BY_ENTRY:
return MessageFormat.format(
"DELETE FROM MsgTable WHERE EntryNum = :{0}",
QueryType.DELETE_MSG_BY_ENTRY.getParamName(0));
case DELETE_MSG_BY_ID:
return MessageFormat.format(
"DELETE FROM MsgTable WHERE MsgId = :{0}",
QueryType.DELETE_MSG_BY_ID.getParamName(0));
case INSERT_NEW_TAG:
return "INSERT INTO TagTable(Tag) SELECT DISTINCT :Tag AS NewTag FROM UtilityTable WHERE"
+ " NOT EXISTS (SELECT 1 FROM TagTable AS SubTable WHERE SubTable.Tag = :Tag)";
return MessageFormat.format(
"INSERT INTO TagTable(Tag) SELECT DISTINCT :Tag AS NewTag FROM UtilityTable WHERE"
+ " NOT EXISTS (SELECT 1 FROM TagTable AS SubTable WHERE SubTable.Tag = :{0})",
QueryType.INSERT_NEW_TAG.getParamName(0));
case GET_LAST_MESSAGE_ENTRY:
return "SELECT MAX(MsgTable.EntryNum) FROM MsgTable";

View File

@ -1,7 +1,5 @@
package meerkat.bulletinboard.sqlserver;
import com.mysql.jdbc.jdbc2.optional.MysqlDataSource;
import meerkat.bulletinboard.BulletinBoardConstants;
import meerkat.bulletinboard.sqlserver.BulletinBoardSQLServer.SQLQueryProvider;
import meerkat.protobuf.BulletinBoardAPI.FilterType;
import org.apache.commons.dbcp2.BasicDataSource;
@ -81,6 +79,16 @@ public class MySQLQueryProvider implements SQLQueryProvider {
QueryType.INSERT_MSG.getParamName(1),
QueryType.INSERT_MSG.getParamName(2));
case DELETE_MSG_BY_ENTRY:
return MessageFormat.format(
"DELETE IGNORE FROM MsgTable WHERE EntryNum = :{0}",
QueryType.DELETE_MSG_BY_ENTRY.getParamName(0));
case DELETE_MSG_BY_ID:
return MessageFormat.format(
"DELETE IGNORE FROM MsgTable WHERE MsgId = :{0}",
QueryType.DELETE_MSG_BY_ID.getParamName(0));
case INSERT_NEW_TAG:
return MessageFormat.format(
"INSERT IGNORE INTO TagTable(Tag) VALUES (:{0})",

View File

@ -47,12 +47,12 @@ public interface BulletinBoardClient {
/**
* Create a SyncQuery to test against that corresponds with the current server state for a specific filter list
* Should only be called on instances for which the actual server contacted is known (i.e. there is only one server)
* @param GenerateSyncQueryParams defines the required information needed to generate the query
* @param generateSyncQueryParams defines the required information needed to generate the query
* These are represented as fractions of the total number of relevant messages
* @return The generated SyncQuery
* @throws CommunicationException when no DB can be contacted
*/
SyncQuery generateSyncQuery(GenerateSyncQueryParams GenerateSyncQueryParams) throws CommunicationException;
SyncQuery generateSyncQuery(GenerateSyncQueryParams generateSyncQueryParams) throws CommunicationException;
/**
* Closes all connections, if any.

View File

@ -0,0 +1,30 @@
package meerkat.bulletinboard;
import com.google.common.util.concurrent.FutureCallback;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI.*;
/**
* Created by Arbel Deutsch Peled on 13-Apr-16.
* This interface is meant to extend a BulletinBoardClient interface/class
* It provides it with the ability to delete messages from the Server
* This assumes the Server implements the {@link DeletableBulletinBoardServer}
*/
public interface BulletinBoardMessageDeleter {
/**
* Deletes a message from a Bulletin Board Server
* @param msgID is the ID of the message to delete
* @param callback handles the result of the operation
*/
public void deleteMessage(MessageID msgID, FutureCallback<Boolean> callback);
/**
* Deletes a message from the Bulletin Board
* Logs this action
* @param entryNum is the serial entry number of the message to delete
* @param callback handles the result of the operation
*/
public void deleteMessage(long entryNum, FutureCallback<Boolean> callback);
}

View File

@ -0,0 +1,29 @@
package meerkat.bulletinboard;
import meerkat.comm.CommunicationException;
import meerkat.protobuf.BulletinBoardAPI.*;
/**
* Created by Arbel Deutsch Peled on 13-Apr-16.
*/
public interface DeletableBulletinBoardServer extends BulletinBoardServer {
/**
* Deletes a message from the Bulletin Board
* Logs this action
* @param msgID is the ID of the message to delete
* @return a BoolMsg containing the value TRUE if a message was deleted, FALSE if the message does not exist
* @throws CommunicationException in case of an error
*/
public BoolMsg deleteMessage(MessageID msgID) throws CommunicationException;
/**
* Deletes a message from the Bulletin Board
* Logs this action
* @param entryNum is the serial entry number of the message to delete
* @return a BoolMsg containing the value TRUE if a message was deleted, FALSE if the message does not exist
* @throws CommunicationException in case of an error
*/
public BoolMsg deleteMessage(long entryNum) throws CommunicationException;
}

View File

@ -0,0 +1,7 @@
package meerkat.bulletinboard;
/**
* Created by Arbel Deutsch Peled on 13-Apr-16.
*/
public interface DeletableSubscriptionBulletinBoardClient extends SubscriptionBulletinBoardClient, BulletinBoardMessageDeleter {
}

View File

@ -1,7 +0,0 @@
package meerkat.bulletinboard;
/**
* Created by Arbel Deutsch Peled on 03-Mar-16.
*/
public interface SubscriptionAsyncBulletinBoardClient extends AsyncBulletinBoardClient, BulletinBoardSubscriber {
}

View File

@ -0,0 +1,7 @@
package meerkat.bulletinboard;
/**
* Created by Arbel Deutsch Peled on 03-Mar-16.
*/
public interface SubscriptionBulletinBoardClient extends AsyncBulletinBoardClient, BulletinBoardSubscriber {
}