Skip to content

Commit

Permalink
feat: Writer / reader interfaces
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Roberts <ryan@blockchaintp.com>
  • Loading branch information
ryan-s-roberts committed Jul 22, 2021
1 parent 78069c0 commit 7bec6e1
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 24 deletions.
21 changes: 21 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,19 @@
<version>2.13.0-rc1</version>
</dependency>

<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
</dependency>

<!-- Test deps -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>1.0.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
Expand Down Expand Up @@ -303,6 +314,16 @@
<scope>test</scope>
</dependency>

<!-- Rx -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<scope>test</scope>
</dependency>

<!-- AWS -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@
* @param <I>
* the type of the sequence
*/
public interface TransactionLog<I, K, V> extends TransactionLogWriter<K, V> {
public interface TransactionLog<K, V, I> extends TransactionLogWriter<K, V, I> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,25 @@
*/
package com.blockchaintp.daml.stores.service;

import java.util.Map;

import org.reactivestreams.Publisher;

/**
*
* @param <I>
* Sequence type
* @param <K>
* Log entry identifier type
* @param <V>
* Log entry type
*/
public interface TransactionLogReader {
public interface TransactionLogReader<I, K, V> {
/**
* Stream committed log entries starting at offset.
*
* @param offset
* @return A stream of comitted log entires.
*/
Publisher<Map.Entry<K, V>> from(I offset);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,39 @@
*/
package com.blockchaintp.daml.stores.service;

import java.util.List;
import java.util.Map;

import com.blockchaintp.daml.stores.exception.StoreWriteException;

/**
* A TransactionLogWriter is a StoreWriter that also supports the sending of events.
*
* @param <K>
* the type of the keys
* the type of the transaction log id
*
* @param <V>
* the type of the transaction value
*
* @param <I>
* the type of the sequence
* the type of the transaction sequence
*
*/
public interface TransactionLogWriter<I, K> {

public interface TransactionLogWriter<K, V, I> {
/**
* Send an event on the specified topic with the specified payload.
*
* @param topic
* the topic to publish the event
* @param data
* the data payload/
* @throws StoreWriteException
* an error writing to the store
* @return The transaction log id of the uncommitted transaction
*/
void sendEvent(String topic, String data) throws StoreWriteException;
K begin(V data);

/**
* Send an event on multiple topics with multiple payloads.
* Commit the transaction with this identifier.
*
* @param listOfTopicDataPairs
* a map where the key is the topic and the value is the data payload
* @throws StoreWriteException
* an error writing to the store
* @param txId
* @return the position in the log of the transaction.
*/
void sendEvent(List<Map.Entry<String, String>> listOfTopicDataPairs) throws StoreWriteException;
I commit(K txId);

/**
* Abort the transaction with this identifier.
*
* @param txId
*/
void abort(K txId);
}

0 comments on commit 7bec6e1

Please sign in to comment.