Merge branch 'Bulletin-Board-Batch'
commit
50bcca8da3
|
@ -381,11 +381,7 @@ public class SingleServerBulletinBoardClient extends SimpleBulletinBoardClient i
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(Boolean msg) {
|
public void onSuccess(Boolean msg) {
|
||||||
closeBatch(
|
closeBatch(
|
||||||
CloseBatchMessage.newBuilder()
|
completeBatch.getCloseBatchMessage(),
|
||||||
.setBatchId(completeBatch.getBeginBatchMessage().getBatchId())
|
|
||||||
.setSig(completeBatch.getSignature())
|
|
||||||
.setBatchLength(completeBatch.getBatchDataList().size())
|
|
||||||
.build(),
|
|
||||||
callback
|
callback
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package meerkat.bulletinboard.workers.singleserver;
|
||||||
import meerkat.bulletinboard.CompleteBatch;
|
import meerkat.bulletinboard.CompleteBatch;
|
||||||
import meerkat.bulletinboard.SingleServerWorker;
|
import meerkat.bulletinboard.SingleServerWorker;
|
||||||
import meerkat.comm.CommunicationException;
|
import meerkat.comm.CommunicationException;
|
||||||
|
import meerkat.comm.MessageInputStream;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
import meerkat.rest.Constants;
|
import meerkat.rest.Constants;
|
||||||
|
|
||||||
|
@ -12,6 +13,9 @@ import javax.ws.rs.client.Entity;
|
||||||
import javax.ws.rs.client.WebTarget;
|
import javax.ws.rs.client.WebTarget;
|
||||||
import javax.ws.rs.core.GenericType;
|
import javax.ws.rs.core.GenericType;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH;
|
import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH;
|
||||||
|
@ -40,29 +44,33 @@ public class SingleServerReadBatchWorker extends SingleServerWorker<BatchSpecifi
|
||||||
Client client = clientLocal.get();
|
Client client = clientLocal.get();
|
||||||
|
|
||||||
WebTarget webTarget;
|
WebTarget webTarget;
|
||||||
Response response;
|
|
||||||
|
|
||||||
// Get the batch data
|
// Get the batch data
|
||||||
|
|
||||||
webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(READ_BATCH_PATH);
|
webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(READ_BATCH_PATH);
|
||||||
response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(
|
InputStream in = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(payload, Constants.MEDIATYPE_PROTOBUF), InputStream.class);
|
||||||
Entity.entity(payload, Constants.MEDIATYPE_PROTOBUF));
|
|
||||||
|
|
||||||
// Retrieve answer
|
MessageInputStream<BatchData> inputStream = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
// If a BatchDataList is returned: the read was successful
|
inputStream = MessageInputStream.MessageInputStreamFactory.createMessageInputStream(in, BatchData.class);
|
||||||
return response.readEntity(BatchDataList.class).getDataList();
|
|
||||||
|
|
||||||
} catch (ProcessingException | IllegalStateException e) {
|
return inputStream.asList();
|
||||||
|
|
||||||
|
} catch (IOException | InvocationTargetException e) {
|
||||||
|
|
||||||
// Read failed
|
// Read failed
|
||||||
throw new CommunicationException("Could not contact the server");
|
throw new CommunicationException("Could not contact the server or server returned illegal result");
|
||||||
|
|
||||||
}
|
} catch (NoSuchMethodException | IllegalAccessException e) {
|
||||||
finally {
|
|
||||||
response.close();
|
throw new CommunicationException("MessageInputStream error");
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
inputStream.close();
|
||||||
|
} catch (IOException ignored) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,8 @@ package meerkat.bulletinboard.workers.singleserver;
|
||||||
|
|
||||||
import meerkat.bulletinboard.SingleServerWorker;
|
import meerkat.bulletinboard.SingleServerWorker;
|
||||||
import meerkat.comm.CommunicationException;
|
import meerkat.comm.CommunicationException;
|
||||||
|
import meerkat.comm.MessageInputStream;
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessageList;
|
import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessageList;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.MessageFilterList;
|
import meerkat.protobuf.BulletinBoardAPI.MessageFilterList;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage;
|
import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage;
|
||||||
|
@ -13,6 +15,9 @@ import javax.ws.rs.client.Entity;
|
||||||
import javax.ws.rs.client.WebTarget;
|
import javax.ws.rs.client.WebTarget;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH;
|
import static meerkat.bulletinboard.BulletinBoardConstants.BULLETIN_BOARD_SERVER_PATH;
|
||||||
|
@ -38,30 +43,33 @@ public class SingleServerReadMessagesWorker extends SingleServerWorker<MessageFi
|
||||||
Client client = clientLocal.get();
|
Client client = clientLocal.get();
|
||||||
|
|
||||||
WebTarget webTarget;
|
WebTarget webTarget;
|
||||||
Response response;
|
|
||||||
|
|
||||||
// Send request to Server
|
// Send request to Server
|
||||||
webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(READ_MESSAGES_PATH);
|
webTarget = client.target(serverAddress).path(BULLETIN_BOARD_SERVER_PATH).path(READ_MESSAGES_PATH);
|
||||||
response = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(
|
InputStream in = webTarget.request(Constants.MEDIATYPE_PROTOBUF).post(Entity.entity(payload, Constants.MEDIATYPE_PROTOBUF), InputStream.class);
|
||||||
Entity.entity(payload, Constants.MEDIATYPE_PROTOBUF));
|
|
||||||
|
|
||||||
// Retrieve answer
|
MessageInputStream<BulletinBoardMessage> inputStream = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
// If a BulletinBoardMessageList is returned: the read was successful
|
inputStream = MessageInputStream.MessageInputStreamFactory.createMessageInputStream(in, BulletinBoardMessage.class);
|
||||||
return response.readEntity(BulletinBoardMessageList.class).getMessageList();
|
|
||||||
|
|
||||||
} catch (ProcessingException | IllegalStateException e) {
|
return inputStream.asList();
|
||||||
|
|
||||||
|
} catch (IOException | InvocationTargetException e) {
|
||||||
|
|
||||||
// Read failed
|
// Read failed
|
||||||
throw new CommunicationException("Could not contact the server");
|
throw new CommunicationException("Could not contact the server or server returned illegal result");
|
||||||
|
|
||||||
}
|
} catch (NoSuchMethodException | IllegalAccessException e) {
|
||||||
finally {
|
|
||||||
response.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
throw new CommunicationException("MessageInputStream error");
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
inputStream.close();
|
||||||
|
} catch (IOException ignored) {}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import com.google.common.util.concurrent.FutureCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.*;
|
||||||
|
import com.google.protobuf.Timestamp;
|
||||||
import meerkat.bulletinboard.AsyncBulletinBoardClient;
|
import meerkat.bulletinboard.AsyncBulletinBoardClient;
|
||||||
import meerkat.bulletinboard.CompleteBatch;
|
import meerkat.bulletinboard.CompleteBatch;
|
||||||
import meerkat.bulletinboard.GenericBatchDigitalSignature;
|
import meerkat.bulletinboard.GenericBatchDigitalSignature;
|
||||||
|
@ -284,6 +285,11 @@ public class ThreadedBulletinBoardClientIntegrationTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
completeBatch.setTimestamp(Timestamp.newBuilder()
|
||||||
|
.setSeconds(Math.abs(90))
|
||||||
|
.setNanos(50)
|
||||||
|
.build());
|
||||||
|
|
||||||
signers[signer].updateContent(completeBatch);
|
signers[signer].updateContent(completeBatch);
|
||||||
|
|
||||||
completeBatch.setSignature(signers[signer].sign());
|
completeBatch.setSignature(signers[signer].sign());
|
||||||
|
@ -357,6 +363,10 @@ public class ThreadedBulletinBoardClientIntegrationTest {
|
||||||
.addTag("Signature")
|
.addTag("Signature")
|
||||||
.addTag("Trustee")
|
.addTag("Trustee")
|
||||||
.setData(ByteString.copyFrom(b1))
|
.setData(ByteString.copyFrom(b1))
|
||||||
|
.setTimestamp(Timestamp.newBuilder()
|
||||||
|
.setSeconds(20)
|
||||||
|
.setNanos(30)
|
||||||
|
.build())
|
||||||
.build())
|
.build())
|
||||||
.addSig(Crypto.Signature.newBuilder()
|
.addSig(Crypto.Signature.newBuilder()
|
||||||
.setType(Crypto.SignatureType.DSA)
|
.setType(Crypto.SignatureType.DSA)
|
||||||
|
@ -440,6 +450,10 @@ public class ThreadedBulletinBoardClientIntegrationTest {
|
||||||
CloseBatchMessage closeBatchMessage = CloseBatchMessage.newBuilder()
|
CloseBatchMessage closeBatchMessage = CloseBatchMessage.newBuilder()
|
||||||
.setBatchId(BATCH_ID)
|
.setBatchId(BATCH_ID)
|
||||||
.setBatchLength(BATCH_LENGTH)
|
.setBatchLength(BATCH_LENGTH)
|
||||||
|
.setTimestamp(Timestamp.newBuilder()
|
||||||
|
.setSeconds(50)
|
||||||
|
.setNanos(80)
|
||||||
|
.build())
|
||||||
.setSig(completeBatch.getSignature())
|
.setSig(completeBatch.getSignature())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
@ -525,6 +539,10 @@ public class ThreadedBulletinBoardClientIntegrationTest {
|
||||||
.setBatchId(NON_EXISTENT_BATCH_ID)
|
.setBatchId(NON_EXISTENT_BATCH_ID)
|
||||||
.setBatchLength(1)
|
.setBatchLength(1)
|
||||||
.setSig(Crypto.Signature.getDefaultInstance())
|
.setSig(Crypto.Signature.getDefaultInstance())
|
||||||
|
.setTimestamp(Timestamp.newBuilder()
|
||||||
|
.setSeconds(9)
|
||||||
|
.setNanos(12)
|
||||||
|
.build())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
// Try to close the (unopened) batch;
|
// Try to close the (unopened) batch;
|
||||||
|
|
|
@ -446,9 +446,10 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{
|
||||||
|
|
||||||
sql = sqlQueryProvider.getSQLString(QueryType.FIND_MSG_ID);
|
sql = sqlQueryProvider.getSQLString(QueryType.FIND_MSG_ID);
|
||||||
Map namedParameters = new HashMap();
|
Map namedParameters = new HashMap();
|
||||||
|
|
||||||
namedParameters.put(QueryType.FIND_MSG_ID.getParamName(0),msgID);
|
namedParameters.put(QueryType.FIND_MSG_ID.getParamName(0),msgID);
|
||||||
|
|
||||||
List<Long> entryNums = jdbcTemplate.query(sql, new MapSqlParameterSource(namedParameters), new LongMapper());
|
List<Long> entryNums = jdbcTemplate.query(sql, namedParameters, new LongMapper());
|
||||||
|
|
||||||
if (entryNums.size() > 0){
|
if (entryNums.size() > 0){
|
||||||
|
|
||||||
|
@ -462,7 +463,7 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{
|
||||||
namedParameters.put(QueryType.INSERT_MSG.getParamName(2), msg.getMsg().toByteArray());
|
namedParameters.put(QueryType.INSERT_MSG.getParamName(2), msg.getMsg().toByteArray());
|
||||||
|
|
||||||
KeyHolder keyHolder = new GeneratedKeyHolder();
|
KeyHolder keyHolder = new GeneratedKeyHolder();
|
||||||
jdbcTemplate.update(sql,new MapSqlParameterSource(namedParameters),keyHolder);
|
jdbcTemplate.update(sql, new MapSqlParameterSource(namedParameters), keyHolder);
|
||||||
|
|
||||||
entryNum = keyHolder.getKey().longValue();
|
entryNum = keyHolder.getKey().longValue();
|
||||||
|
|
||||||
|
@ -785,6 +786,9 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Add timestamp to CompleteBatch
|
||||||
|
completeBatch.setTimestamp(message.getTimestamp());
|
||||||
|
|
||||||
// Add actual batch data to CompleteBatch
|
// Add actual batch data to CompleteBatch
|
||||||
|
|
||||||
sql = sqlQueryProvider.getSQLString(QueryType.GET_BATCH_MESSAGE_DATA);
|
sql = sqlQueryProvider.getSQLString(QueryType.GET_BATCH_MESSAGE_DATA);
|
||||||
|
@ -903,20 +907,20 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{
|
||||||
|
|
||||||
long lastEntryNum = getLastMessageEntry();
|
long lastEntryNum = getLastMessageEntry();
|
||||||
|
|
||||||
long checksum = 0;
|
|
||||||
|
|
||||||
Iterator<SingleSyncQuery> queryIterator = syncQuery.getQueryList().iterator();
|
Iterator<SingleSyncQuery> queryIterator = syncQuery.getQueryList().iterator();
|
||||||
|
|
||||||
SingleSyncQuery currentQuery = queryIterator.next();
|
SingleSyncQuery currentQuery = queryIterator.next();
|
||||||
|
|
||||||
List<BulletinBoardMessage> messageStubs = readMessageStubs(syncQuery.getFilterList());
|
List<BulletinBoardMessage> messageStubs = readMessageStubs(syncQuery.getFilterList());
|
||||||
|
|
||||||
|
Checksum checksum = new SimpleChecksum();
|
||||||
|
|
||||||
for (BulletinBoardMessage message : messageStubs){
|
for (BulletinBoardMessage message : messageStubs){
|
||||||
|
|
||||||
// Check for end of current query
|
// Check for end of current query
|
||||||
if (timestampComparator.compare(message.getMsg().getTimestamp(), currentQuery.getTimeOfSync()) > 0){
|
if (timestampComparator.compare(message.getMsg().getTimestamp(), currentQuery.getTimeOfSync()) > 0){
|
||||||
|
|
||||||
if (checksum == currentQuery.getChecksum()){
|
if (checksum.getChecksum() == currentQuery.getChecksum()){
|
||||||
lastTimeOfSync = currentQuery.getTimeOfSync();
|
lastTimeOfSync = currentQuery.getTimeOfSync();
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
|
@ -932,13 +936,13 @@ public class BulletinBoardSQLServer implements BulletinBoardServer{
|
||||||
|
|
||||||
// Advance checksum
|
// Advance checksum
|
||||||
|
|
||||||
ByteString messageID = message.getMsg().getData();
|
ByteString messageID = message.getMsg().getData(); // The data field contains the message ID
|
||||||
|
|
||||||
checksum &= messageID.byteAt(0) & messageID.byteAt(1) & messageID.byteAt(2) & messageID.byteAt(3);
|
checksum.update(messageID);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (checksum == currentQuery.getChecksum()){
|
if (checksum.getChecksum() == currentQuery.getChecksum()){
|
||||||
lastTimeOfSync = currentQuery.getTimeOfSync();
|
lastTimeOfSync = currentQuery.getTimeOfSync();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@ import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
import org.sqlite.SQLiteDataSource;
|
import org.sqlite.SQLiteDataSource;
|
||||||
|
|
||||||
import javax.sql.DataSource;
|
import javax.sql.DataSource;
|
||||||
|
import java.text.MessageFormat;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -25,19 +26,97 @@ public class SQLiteQueryProvider implements BulletinBoardSQLServer.SQLQueryProvi
|
||||||
switch(queryType) {
|
switch(queryType) {
|
||||||
case ADD_SIGNATURE:
|
case ADD_SIGNATURE:
|
||||||
return "INSERT OR IGNORE INTO SignatureTable (EntryNum, SignerId, Signature) VALUES (:EntryNum,:SignerId,:Signature)";
|
return "INSERT OR IGNORE INTO SignatureTable (EntryNum, SignerId, Signature) VALUES (:EntryNum,:SignerId,:Signature)";
|
||||||
|
|
||||||
case CONNECT_TAG:
|
case CONNECT_TAG:
|
||||||
return "INSERT OR IGNORE INTO MsgTagTable (TagId, EntryNum)"
|
return "INSERT OR IGNORE INTO MsgTagTable (TagId, EntryNum)"
|
||||||
+ " SELECT TagTable.TagId, :EntryNum AS EntryNum FROM TagTable WHERE Tag = :Tag";
|
+ " SELECT TagTable.TagId, :EntryNum AS EntryNum FROM TagTable WHERE Tag = :Tag";
|
||||||
|
|
||||||
case FIND_MSG_ID:
|
case FIND_MSG_ID:
|
||||||
return "SELECT EntryNum From MsgTable WHERE MsgId = :MsgId";
|
return "SELECT EntryNum From MsgTable WHERE MsgId = :MsgId";
|
||||||
|
|
||||||
|
case FIND_TAG_ID:
|
||||||
|
return MessageFormat.format(
|
||||||
|
"SELECT TagId FROM TagTable WHERE Tag = :{0}",
|
||||||
|
QueryType.FIND_TAG_ID.getParamName(0));
|
||||||
|
|
||||||
case GET_MESSAGES:
|
case GET_MESSAGES:
|
||||||
return "SELECT MsgTable.EntryNum, MsgTable.Msg FROM MsgTable";
|
return "SELECT MsgTable.EntryNum, MsgTable.Msg FROM MsgTable";
|
||||||
|
|
||||||
|
case COUNT_MESSAGES:
|
||||||
|
return "SELECT COUNT(MsgTable.EntryNum) FROM MsgTable";
|
||||||
|
|
||||||
|
case GET_MESSAGE_STUBS:
|
||||||
|
return "SELECT MsgTable.EntryNum, MsgTable.MsgId, MsgTable.ExactTime FROM MsgTable";
|
||||||
|
|
||||||
case GET_SIGNATURES:
|
case GET_SIGNATURES:
|
||||||
return "SELECT Signature FROM SignatureTable WHERE EntryNum = :EntryNum";
|
return "SELECT Signature FROM SignatureTable WHERE EntryNum = :EntryNum";
|
||||||
|
|
||||||
case INSERT_MSG:
|
case INSERT_MSG:
|
||||||
return "INSERT INTO MsgTable (MsgId, Msg) VALUES(:MsgId,:Msg)";
|
return "INSERT INTO MsgTable (MsgId, Msg) VALUES(:MsgId,:Msg)";
|
||||||
|
|
||||||
case INSERT_NEW_TAG:
|
case INSERT_NEW_TAG:
|
||||||
return "INSERT OR IGNORE INTO TagTable(Tag) VALUES (:Tag)";
|
return "INSERT OR IGNORE INTO TagTable(Tag) VALUES (:Tag)";
|
||||||
|
|
||||||
|
case GET_LAST_MESSAGE_ENTRY:
|
||||||
|
return "SELECT MAX(MsgTable.EntryNum) FROM MsgTable";
|
||||||
|
|
||||||
|
case GET_BATCH_MESSAGE_ENTRY:
|
||||||
|
return MessageFormat.format(
|
||||||
|
"SELECT MsgTable.EntryNum, MsgTable.Msg FROM MsgTable"
|
||||||
|
+ " INNER JOIN SignatureTable ON MsgTable.EntryNum = SignatureTable.EntryNum"
|
||||||
|
+ " INNER JOIN MsgTagTable ON MsgTable.EntryNum = MsgTagTable.EntryNum"
|
||||||
|
+ " INNER JOIN TagTable ON MsgTagTable.TagId = TagTable.TagId"
|
||||||
|
+ " WHERE SignatureTable.SignerId = :{0}"
|
||||||
|
+ " AND TagTable.Tag = :{1}",
|
||||||
|
QueryType.GET_BATCH_MESSAGE_ENTRY.getParamName(0),
|
||||||
|
QueryType.GET_BATCH_MESSAGE_ENTRY.getParamName(1));
|
||||||
|
|
||||||
|
case GET_BATCH_MESSAGE_DATA:
|
||||||
|
return MessageFormat.format(
|
||||||
|
"SELECT Data FROM BatchTable"
|
||||||
|
+ " WHERE SignerId = :{0} AND BatchId = :{1} AND SerialNum >= :{2}"
|
||||||
|
+ " ORDER BY SerialNum ASC",
|
||||||
|
QueryType.GET_BATCH_MESSAGE_DATA.getParamName(0),
|
||||||
|
QueryType.GET_BATCH_MESSAGE_DATA.getParamName(1),
|
||||||
|
QueryType.GET_BATCH_MESSAGE_DATA.getParamName(2));
|
||||||
|
|
||||||
|
case INSERT_BATCH_DATA:
|
||||||
|
return MessageFormat.format(
|
||||||
|
"INSERT INTO BatchTable (SignerId, BatchId, SerialNum, Data)"
|
||||||
|
+ " VALUES (:{0}, :{1}, :{2}, :{3})",
|
||||||
|
QueryType.INSERT_BATCH_DATA.getParamName(0),
|
||||||
|
QueryType.INSERT_BATCH_DATA.getParamName(1),
|
||||||
|
QueryType.INSERT_BATCH_DATA.getParamName(2),
|
||||||
|
QueryType.INSERT_BATCH_DATA.getParamName(3));
|
||||||
|
|
||||||
|
case CHECK_BATCH_LENGTH:
|
||||||
|
return MessageFormat.format(
|
||||||
|
"SELECT COUNT(Data) AS BatchLength FROM BatchTable"
|
||||||
|
+ " WHERE SignerId = :{0} AND BatchId = :{1}",
|
||||||
|
QueryType.CHECK_BATCH_LENGTH.getParamName(0),
|
||||||
|
QueryType.CHECK_BATCH_LENGTH.getParamName(1));
|
||||||
|
|
||||||
|
case CONNECT_BATCH_TAG:
|
||||||
|
return MessageFormat.format(
|
||||||
|
"INSERT INTO BatchTagTable (SignerId, BatchId, TagId) SELECT :{0}, :{1}, TagId FROM TagTable"
|
||||||
|
+ " WHERE Tag = :{2}",
|
||||||
|
QueryType.CONNECT_BATCH_TAG.getParamName(0),
|
||||||
|
QueryType.CONNECT_BATCH_TAG.getParamName(1),
|
||||||
|
QueryType.CONNECT_BATCH_TAG.getParamName(2));
|
||||||
|
|
||||||
|
case GET_BATCH_TAGS:
|
||||||
|
return MessageFormat.format(
|
||||||
|
"SELECT Tag FROM TagTable INNER JOIN BatchTagTable ON TagTable.TagId = BatchTagTable.TagId"
|
||||||
|
+ " WHERE SignerId = :{0} AND BatchId = :{1} ORDER BY Tag ASC",
|
||||||
|
QueryType.GET_BATCH_TAGS.getParamName(0),
|
||||||
|
QueryType.GET_BATCH_TAGS.getParamName(1));
|
||||||
|
|
||||||
|
case REMOVE_BATCH_TAGS:
|
||||||
|
return MessageFormat.format(
|
||||||
|
"DELETE FROM BatchTagTable WHERE SignerId = :{0} AND BatchId = :{1}",
|
||||||
|
QueryType.REMOVE_BATCH_TAGS.getParamName(0),
|
||||||
|
QueryType.REMOVE_BATCH_TAGS.getParamName(1));
|
||||||
|
|
||||||
default:
|
default:
|
||||||
throw new IllegalArgumentException("Cannot serve a query of type " + queryType);
|
throw new IllegalArgumentException("Cannot serve a query of type " + queryType);
|
||||||
}
|
}
|
||||||
|
@ -52,21 +131,34 @@ public class SQLiteQueryProvider implements BulletinBoardSQLServer.SQLQueryProvi
|
||||||
switch(filterType) {
|
switch(filterType) {
|
||||||
case EXACT_ENTRY:
|
case EXACT_ENTRY:
|
||||||
return "MsgTable.EntryNum = :EntryNum" + serialString;
|
return "MsgTable.EntryNum = :EntryNum" + serialString;
|
||||||
|
|
||||||
case MAX_ENTRY:
|
case MAX_ENTRY:
|
||||||
return "MsgTable.EntryNum <= :EntryNum" + serialString;
|
return "MsgTable.EntryNum <= :EntryNum" + serialString;
|
||||||
|
|
||||||
case MIN_ENTRY:
|
case MIN_ENTRY:
|
||||||
return "MsgTable.EntryNum <= :EntryNum" + serialString;
|
return "MsgTable.EntryNum <= :EntryNum" + serialString;
|
||||||
|
|
||||||
case MAX_MESSAGES:
|
case MAX_MESSAGES:
|
||||||
return "LIMIT = :Limit" + serialString;
|
return "LIMIT = :Limit" + serialString;
|
||||||
|
|
||||||
case MSG_ID:
|
case MSG_ID:
|
||||||
return "MsgTable.MsgId = :MsgId" + serialString;
|
return "MsgTable.MsgId = :MsgId" + serialString;
|
||||||
|
|
||||||
case SIGNER_ID:
|
case SIGNER_ID:
|
||||||
return "EXISTS (SELECT 1 FROM SignatureTable"
|
return "EXISTS (SELECT 1 FROM SignatureTable"
|
||||||
+ " WHERE SignatureTable.SignerId = :SignerId" + serialString + " AND SignatureTable.EntryNum = MsgTable.EntryNum)";
|
+ " WHERE SignatureTable.SignerId = :SignerId" + serialString + " AND SignatureTable.EntryNum = MsgTable.EntryNum)";
|
||||||
|
|
||||||
case TAG:
|
case TAG:
|
||||||
return "EXISTS (SELECT 1 FROM TagTable"
|
return "EXISTS (SELECT 1 FROM TagTable"
|
||||||
+ " INNER JOIN MsgTagTable ON TagTable.TagId = MsgTagTable.TagId"
|
+ " INNER JOIN MsgTagTable ON TagTable.TagId = MsgTagTable.TagId"
|
||||||
+ " WHERE TagTable.Tag = :Tag" + serialString + " AND MsgTagTable.EntryNum = MsgTable.EntryNum)";
|
+ " WHERE TagTable.Tag = :Tag" + serialString + " AND MsgTagTable.EntryNum = MsgTable.EntryNum)";
|
||||||
|
|
||||||
|
case BEFORE_TIME:
|
||||||
|
return "MsgTable.ExactTime <= :TimeStamp";
|
||||||
|
|
||||||
|
case AFTER_TIME:
|
||||||
|
return "MsgTable.ExactTime >= :TimeStamp";
|
||||||
|
|
||||||
default:
|
default:
|
||||||
throw new IllegalArgumentException("Cannot serve a filter of type " + filterType);
|
throw new IllegalArgumentException("Cannot serve a filter of type " + filterType);
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,9 +24,12 @@ import meerkat.comm.CommunicationException;
|
||||||
import meerkat.comm.MessageInputStream;
|
import meerkat.comm.MessageInputStream;
|
||||||
import meerkat.comm.MessageOutputStream;
|
import meerkat.comm.MessageOutputStream;
|
||||||
import meerkat.comm.MessageInputStream.MessageInputStreamFactory;
|
import meerkat.comm.MessageInputStream.MessageInputStreamFactory;
|
||||||
|
import meerkat.crypto.Digest;
|
||||||
import meerkat.crypto.concrete.ECDSASignature;
|
import meerkat.crypto.concrete.ECDSASignature;
|
||||||
|
import meerkat.crypto.concrete.SHA256Digest;
|
||||||
import meerkat.protobuf.BulletinBoardAPI.*;
|
import meerkat.protobuf.BulletinBoardAPI.*;
|
||||||
import meerkat.util.BulletinBoardUtils;
|
import meerkat.util.BulletinBoardMessageGenerator;
|
||||||
|
import org.h2.util.DateTimeUtils;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
import static org.hamcrest.CoreMatchers.*;
|
import static org.hamcrest.CoreMatchers.*;
|
||||||
|
@ -59,6 +62,10 @@ public class GenericBulletinBoardServerTest {
|
||||||
|
|
||||||
private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); // Used to time the tests
|
private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); // Used to time the tests
|
||||||
|
|
||||||
|
private BulletinBoardMessageGenerator bulletinBoardMessageGenerator;
|
||||||
|
|
||||||
|
private Digest digest;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param bulletinBoardServer is an initialized server.
|
* @param bulletinBoardServer is an initialized server.
|
||||||
* @throws InstantiationException
|
* @throws InstantiationException
|
||||||
|
@ -122,7 +129,11 @@ public class GenericBulletinBoardServerTest {
|
||||||
fail("Couldn't find signing key " + e.getMessage());
|
fail("Couldn't find signing key " + e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
random = new Random(0); // We use insecure randomness in tests for repeatability
|
// We use insecure randomness in tests for repeatability
|
||||||
|
random = new Random(0);
|
||||||
|
bulletinBoardMessageGenerator = new BulletinBoardMessageGenerator(random);
|
||||||
|
|
||||||
|
digest = new SHA256Digest();
|
||||||
|
|
||||||
long end = threadBean.getCurrentThreadCpuTime();
|
long end = threadBean.getCurrentThreadCpuTime();
|
||||||
System.err.println("Finished initializing GenericBulletinBoardServerTest");
|
System.err.println("Finished initializing GenericBulletinBoardServerTest");
|
||||||
|
@ -182,7 +193,10 @@ public class GenericBulletinBoardServerTest {
|
||||||
for (i = 1; i <= MESSAGE_NUM; i++) {
|
for (i = 1; i <= MESSAGE_NUM; i++) {
|
||||||
unsignedMsgBuilder = UnsignedBulletinBoardMessage.newBuilder()
|
unsignedMsgBuilder = UnsignedBulletinBoardMessage.newBuilder()
|
||||||
.setData(ByteString.copyFrom(data[i - 1]))
|
.setData(ByteString.copyFrom(data[i - 1]))
|
||||||
.setTimestamp(BulletinBoardUtils.toTimestampProto());
|
.setTimestamp(Timestamp.newBuilder()
|
||||||
|
.setSeconds(i)
|
||||||
|
.setNanos(i)
|
||||||
|
.build());
|
||||||
|
|
||||||
// Add tags based on bit-representation of message number.
|
// Add tags based on bit-representation of message number.
|
||||||
|
|
||||||
|
@ -620,6 +634,74 @@ public class GenericBulletinBoardServerTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSyncQuery()
|
||||||
|
throws SignatureException, CommunicationException, IOException,NoSuchMethodException, IllegalAccessException, InvocationTargetException {
|
||||||
|
|
||||||
|
Checksum checksum = new SimpleChecksum();
|
||||||
|
|
||||||
|
Timestamp timestamp = Timestamp.newBuilder()
|
||||||
|
.setSeconds(1)
|
||||||
|
.setNanos(0)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
BulletinBoardMessage newMessage = bulletinBoardMessageGenerator.generateRandomMessage(signers, timestamp, 10, 10);
|
||||||
|
|
||||||
|
BoolMsg result = bulletinBoardServer.postMessage(newMessage);
|
||||||
|
assertThat("Failed to post message to BB Server", result.getValue(), is(true));
|
||||||
|
|
||||||
|
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||||
|
|
||||||
|
digest.update(newMessage.getMsg());
|
||||||
|
|
||||||
|
ByteString messageID = ByteString.copyFrom(digest.digest());
|
||||||
|
|
||||||
|
MessageFilterList filterList = MessageFilterList.newBuilder()
|
||||||
|
.addFilter(MessageFilter.newBuilder()
|
||||||
|
.setType(FilterType.MSG_ID)
|
||||||
|
.setId(messageID)
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
bulletinBoardServer.readMessages(filterList, new MessageOutputStream<BulletinBoardMessage>(outputStream));
|
||||||
|
|
||||||
|
MessageInputStream<BulletinBoardMessage> inputStream =
|
||||||
|
MessageInputStreamFactory.createMessageInputStream(new ByteArrayInputStream(
|
||||||
|
outputStream.toByteArray()),
|
||||||
|
BulletinBoardMessage.class);
|
||||||
|
|
||||||
|
long lastEntry = inputStream.asList().get(0).getEntryNum();
|
||||||
|
|
||||||
|
SyncQuery syncQuery = SyncQuery.newBuilder()
|
||||||
|
.setFilterList(MessageFilterList.getDefaultInstance())
|
||||||
|
.addQuery(SingleSyncQuery.newBuilder()
|
||||||
|
.setChecksum(2)
|
||||||
|
.setTimeOfSync(Timestamp.newBuilder()
|
||||||
|
.setSeconds(2)
|
||||||
|
.setNanos(0)
|
||||||
|
.build())
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
SyncQueryResponse queryResponse = bulletinBoardServer.querySync(syncQuery);
|
||||||
|
|
||||||
|
assertThat("Sync query replies with positive sync when no sync was expected", queryResponse.getLastEntryNum(), is(equalTo(-1l)));
|
||||||
|
|
||||||
|
syncQuery = SyncQuery.newBuilder()
|
||||||
|
.setFilterList(MessageFilterList.getDefaultInstance())
|
||||||
|
.addQuery(SingleSyncQuery.newBuilder()
|
||||||
|
.setChecksum(checksum.getChecksum(messageID))
|
||||||
|
.setTimeOfSync(timestamp)
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
queryResponse = bulletinBoardServer.querySync(syncQuery);
|
||||||
|
|
||||||
|
assertThat("Sync query reply contained wrong last entry number", lastEntry, is(equalTo(queryResponse.getLastEntryNum())));
|
||||||
|
|
||||||
|
assertThat("Sync query reply contained wrong timestamp", timestamp, is(equalTo(queryResponse.getLastTimeOfSync())));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public void close(){
|
public void close(){
|
||||||
signers[0].clearSigningKey();
|
signers[0].clearSigningKey();
|
||||||
signers[1].clearSigningKey();
|
signers[1].clearSigningKey();
|
||||||
|
|
|
@ -8,8 +8,11 @@ import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
import java.lang.management.ThreadMXBean;
|
import java.lang.management.ThreadMXBean;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.security.SignatureException;
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.DriverManager;
|
import java.sql.DriverManager;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
|
@ -144,6 +147,16 @@ public class MySQLBulletinBoardServerTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSyncQuery() {
|
||||||
|
try {
|
||||||
|
serverTest.testSyncQuery();
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.err.println(e.getMessage());
|
||||||
|
fail(e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void close() {
|
public void close() {
|
||||||
System.err.println("Starting to close MySQLBulletinBoardServerTest");
|
System.err.println("Starting to close MySQLBulletinBoardServerTest");
|
||||||
|
|
|
@ -0,0 +1,122 @@
|
||||||
|
package meerkat.bulletinboard;
|
||||||
|
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
import meerkat.crypto.Digest;
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage;
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.MessageID;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by Arbel Deutsch Peled on 01-Mar-16.
|
||||||
|
* This interface is used to create checksums of Bulletin Board messages IDs
|
||||||
|
* This is useful in comparing database states
|
||||||
|
*/
|
||||||
|
public interface Checksum {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the Digest method which is used in creating message IDs from the messages themselves
|
||||||
|
* This method must be called with an initialized Digest before calling any methods that receive a parameter of type BulletinBoardMessage
|
||||||
|
* @param digest is the Digest that will be used to create message IDs from Bulletin Board Messages
|
||||||
|
*/
|
||||||
|
public void setDigest(Digest digest);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to reset the current checksum state
|
||||||
|
*/
|
||||||
|
public void reset();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the current checksum with the given ID
|
||||||
|
* @param messageID is the message ID to be added
|
||||||
|
*/
|
||||||
|
public void update(MessageID messageID);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the current checksum with the given collection of IDs
|
||||||
|
* @param messageIDs contains the message IDs
|
||||||
|
*/
|
||||||
|
public void update(Collection<MessageID> messageIDs);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the current checksum with the given ID
|
||||||
|
* @param messageID is the message ID to be added
|
||||||
|
*/
|
||||||
|
public void update(ByteString messageID);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the current checksum with the given ID
|
||||||
|
* @param messageID is the message ID to be added
|
||||||
|
*/
|
||||||
|
public void update(byte[] messageID);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the current checksum with the message ID of the given message
|
||||||
|
* @param bulletinBoardMessage is the message whose ID should be added to the checksum
|
||||||
|
* @throws IllegalStateException if a Digest has not been set before calling this method
|
||||||
|
*/
|
||||||
|
public void digestAndUpdate(BulletinBoardMessage bulletinBoardMessage) throws IllegalStateException;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the current checksum with the message IDs of the given messages
|
||||||
|
* @param bulletinBoardMessages contains the messages whose IDs should be added to the checksum
|
||||||
|
* @throws IllegalStateException if a Digest has not been set before calling this method
|
||||||
|
*/
|
||||||
|
public void digestAndUpdate(Collection<BulletinBoardMessage> bulletinBoardMessages) throws IllegalStateException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the current checksum without changing the checksum state
|
||||||
|
* @return the current checksum
|
||||||
|
*/
|
||||||
|
public long getChecksum();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates the current checksum with the given ID and returns the resulting checksum
|
||||||
|
* The checksum is not reset afterwards
|
||||||
|
* @param messageID is the message ID to be added
|
||||||
|
* @return the updated checksum
|
||||||
|
*/
|
||||||
|
public long getChecksum(MessageID messageID);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates the current checksum with the given ID and returns the resulting checksum
|
||||||
|
* The checksum is not reset afterwards
|
||||||
|
* @param messageID is the message ID to be added
|
||||||
|
* @return the updated checksum
|
||||||
|
*/
|
||||||
|
public long getChecksum(ByteString messageID);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates the current checksum with the given ID and returns the resulting checksum
|
||||||
|
* The checksum is not reset afterwards
|
||||||
|
* @param messageID is the message ID to be added
|
||||||
|
* @return the updated checksum
|
||||||
|
*/
|
||||||
|
public long getChecksum(byte[] messageID);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates the current checksum with the given IDs and returns the resulting checksum
|
||||||
|
* The checksum is not reset afterwards
|
||||||
|
* @param messageIDs contains the message IDs to be added
|
||||||
|
* @return the updated checksum
|
||||||
|
*/
|
||||||
|
public long getChecksum(Collection<MessageID> messageIDs);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates the current checksum with the message ID of the given message
|
||||||
|
* The checksum is not reset afterwards
|
||||||
|
* @param bulletinBoardMessage is the message whose ID should be added to the checksum
|
||||||
|
* @return the updated checksum
|
||||||
|
*/
|
||||||
|
public long digestAndGetChecksum(BulletinBoardMessage bulletinBoardMessage) throws IllegalStateException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates the current checksum with the message IDs of the given messages
|
||||||
|
* The checksum is not reset afterwards
|
||||||
|
* @param bulletinBoardMessages contains the messages whose IDs should be added to the checksum
|
||||||
|
* @return the updated checksum
|
||||||
|
*/
|
||||||
|
public long digestAndGetChecksum(Collection<BulletinBoardMessage> bulletinBoardMessages) throws IllegalStateException;
|
||||||
|
|
||||||
|
}
|
|
@ -29,6 +29,8 @@ public class GenericBatchDigest implements BatchDigest{
|
||||||
update(batchData);
|
update(batchData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
update(completeBatch.getTimestamp());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -32,10 +32,13 @@ public class GenericBatchDigitalSignature implements BatchDigitalSignature{
|
||||||
public void updateContent(CompleteBatch completeBatch) throws SignatureException {
|
public void updateContent(CompleteBatch completeBatch) throws SignatureException {
|
||||||
|
|
||||||
digitalSignature.updateContent(completeBatch.getBeginBatchMessage());
|
digitalSignature.updateContent(completeBatch.getBeginBatchMessage());
|
||||||
|
|
||||||
for (BatchData batchData : completeBatch.getBatchDataList()) {
|
for (BatchData batchData : completeBatch.getBatchDataList()) {
|
||||||
digitalSignature.updateContent(batchData);
|
digitalSignature.updateContent(batchData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
digitalSignature.updateContent(completeBatch.getTimestamp());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,131 @@
|
||||||
|
package meerkat.bulletinboard;
|
||||||
|
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
import meerkat.crypto.Digest;
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.MessageID;
|
||||||
|
import meerkat.protobuf.BulletinBoardAPI.BulletinBoardMessage;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by Arbel Deutsch Peled on 01-Mar-16.
|
||||||
|
* Implementation of Checksum via bitwise XOR of the bytes of message IDs
|
||||||
|
*/
|
||||||
|
public class SimpleChecksum implements Checksum{
|
||||||
|
|
||||||
|
private Digest digest;
|
||||||
|
private long checksum;
|
||||||
|
|
||||||
|
public SimpleChecksum() {
|
||||||
|
digest = null;
|
||||||
|
reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setDigest(Digest digest) {
|
||||||
|
this.digest = digest;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reset() {
|
||||||
|
checksum = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void update(MessageID messageID) {
|
||||||
|
ByteString messageIDByteString = messageID.getID();
|
||||||
|
update(messageIDByteString);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void update(Collection<MessageID> messageIDs) {
|
||||||
|
|
||||||
|
for (MessageID messageID : messageIDs){
|
||||||
|
update(messageID);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void update(ByteString messageID) {
|
||||||
|
for (int i = 0 ; i < messageID.size() ; i++){
|
||||||
|
checksum &= messageID.byteAt(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void update(byte[] messageID) {
|
||||||
|
for (int i = 0 ; i < messageID.length ; i++){
|
||||||
|
checksum &= messageID[i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkDigest() throws IllegalStateException {
|
||||||
|
|
||||||
|
if (digest == null){
|
||||||
|
throw new IllegalStateException("Digest method not set. Use setDigest method before calling digestAndUpdate.");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void digestAndUpdate(BulletinBoardMessage bulletinBoardMessage) throws IllegalStateException {
|
||||||
|
|
||||||
|
checkDigest();
|
||||||
|
|
||||||
|
digest.reset();
|
||||||
|
digest.update(bulletinBoardMessage);
|
||||||
|
update(digest.digest());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void digestAndUpdate(Collection<BulletinBoardMessage> bulletinBoardMessages) throws IllegalStateException {
|
||||||
|
|
||||||
|
for (BulletinBoardMessage bulletinBoardMessage : bulletinBoardMessages){
|
||||||
|
digestAndUpdate(bulletinBoardMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getChecksum() {
|
||||||
|
return checksum;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getChecksum(MessageID messageID) {
|
||||||
|
update(messageID);
|
||||||
|
return getChecksum();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getChecksum(ByteString messageID) {
|
||||||
|
update(messageID);
|
||||||
|
return getChecksum();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getChecksum(byte[] messageID) {
|
||||||
|
update(messageID);
|
||||||
|
return getChecksum();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getChecksum(Collection<MessageID> messageIDs) {
|
||||||
|
update(messageIDs);
|
||||||
|
return getChecksum();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long digestAndGetChecksum(BulletinBoardMessage bulletinBoardMessage) throws IllegalStateException {
|
||||||
|
digestAndUpdate(bulletinBoardMessage);
|
||||||
|
return getChecksum();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long digestAndGetChecksum(Collection<BulletinBoardMessage> bulletinBoardMessages) throws IllegalStateException {
|
||||||
|
digestAndUpdate(bulletinBoardMessages);
|
||||||
|
return getChecksum();
|
||||||
|
}
|
||||||
|
}
|
|
@ -5,6 +5,7 @@ import com.google.protobuf.Message;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -12,7 +13,7 @@ import java.util.List;
|
||||||
* Created by Arbel Deutsch Peled on 21-Feb-16.
|
* Created by Arbel Deutsch Peled on 21-Feb-16.
|
||||||
* A input stream of Protobuf messages
|
* A input stream of Protobuf messages
|
||||||
*/
|
*/
|
||||||
public class MessageInputStream<T extends Message>{
|
public class MessageInputStream<T extends Message> implements Iterable<T>{
|
||||||
|
|
||||||
private T.Builder builder;
|
private T.Builder builder;
|
||||||
|
|
||||||
|
@ -23,6 +24,33 @@ public class MessageInputStream<T extends Message>{
|
||||||
this.builder = (T.Builder) type.getMethod("newBuilder").invoke(type);
|
this.builder = (T.Builder) type.getMethod("newBuilder").invoke(type);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<T> iterator() {
|
||||||
|
|
||||||
|
return new Iterator<T>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
try {
|
||||||
|
return isAvailable();
|
||||||
|
} catch (IOException e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T next() {
|
||||||
|
try {
|
||||||
|
return readMessage();
|
||||||
|
} catch (IOException e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Factory class for actually creating a MessageInputStream
|
* Factory class for actually creating a MessageInputStream
|
||||||
*/
|
*/
|
||||||
|
@ -61,4 +89,10 @@ public class MessageInputStream<T extends Message>{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void close() throws IOException {
|
||||||
|
|
||||||
|
in.close();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,4 +21,8 @@ public class MessageOutputStream<T extends Message> {
|
||||||
message.writeDelimitedTo(out);
|
message.writeDelimitedTo(out);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void close() throws IOException {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,7 +80,7 @@ public class BulletinBoardUtils {
|
||||||
* This method creates a Timestamp Protobuf from the current system time
|
* This method creates a Timestamp Protobuf from the current system time
|
||||||
* @return a Timestamp Protobuf encoding of the current system time
|
* @return a Timestamp Protobuf encoding of the current system time
|
||||||
*/
|
*/
|
||||||
public static com.google.protobuf.Timestamp toTimestampProto() {
|
public static com.google.protobuf.Timestamp getCurrentTimestampProto() {
|
||||||
|
|
||||||
return toTimestampProto(System.currentTimeMillis());
|
return toTimestampProto(System.currentTimeMillis());
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue