Skip to content

Commit

Permalink
feat: Added support for keyspace in Redis keys
Browse files Browse the repository at this point in the history
  • Loading branch information
Julien Ruaux committed May 25, 2022
1 parent fcf374d commit fc99e65
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 103 deletions.
6 changes: 6 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,10 @@ image:https://codecov.io/gh/{project-owner}/{project-name}/branch/master/graph/b

Kafka Connect source and sink connectors for https://redis.com/redis-enterprise-software/overview/[Redis Enterprise]

== Documentation

Refer to the link:https://{project-owner}.github.io/{project-name}[documentation] for configuration and usage information.

== Docker Example

Run `docker/.run.sh` and follow prompts
4 changes: 2 additions & 2 deletions docker/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ echo "Building the Redis Enterprise Kafka Connector"
(
cd ..
./mvnw clean package
find ./target/components/packages -type d -name "redis-redis-enterprise-kafka-5.*" -mindepth 2 -maxdepth 2 -exec mv {} ./target/components/packages/redis-enterprise-kafka \;
find ./target/components/packages -mindepth 2 -maxdepth 2 -type d -name "redis-redis-enterprise-kafka-5.*" -exec mv {} ./target/components/packages/redis-enterprise-kafka \;
)

echo "Starting docker ."
Expand Down Expand Up @@ -150,7 +150,7 @@ The `pageviews` stream in Redis should contain the sunk page views: redis-cli xl
Examine the Redis database:
- In your shell run: docker-compose exec redis /usr/local/bin/redis-cli
- List some RedisJSON keys: SCAN 0 TYPE ReJSON-RL
- Show the JSON value of a given key: JSON.GET 971
- Show the JSON value of a given key: JSON.GET pageviews:971
==============================================================================================================
Use <ctrl>-c to quit'''
Expand Down
92 changes: 51 additions & 41 deletions src/docs/asciidoc/_sink.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,22 @@ You can specify the number of tasks with the `tasks.max` configuration property.

[[data-structures]]
=== Redis Data Structures

Record keys and values have different roles depending on the target data structure.
The {name} supports the following Redis data-structure types as targets:

[[collection-key]]
==== Collections
For collections (stream, list, set, sorted set, timeseries) a single key is used which is independent of the record key.

Use the `redis.key` configuration property (default: `${topic}`) to specify a format string for the destination collection, which may contain `${topic}` as a placeholder for the originating topic name.

For example `kafka_${topic}` for the topic `orders` will map to the Redis key `kafka_orders`.

==== Stream

Use the following properties to store Kafka records as Redis stream messages:

[source,properties]
----
redis.type=STREAM
redis.key=<stream key> <1>
value.converter=<Avro or JSON> <2>
----

<1> <<collection-key,Stream key>>
<2> <<avro,Avro>> or <<kafka-json,JSON>>

* Collections: <<sync-stream,stream>>, <<sync-list,list>>, <<sync-set,set>>, <<sync-zset,sorted set>>, <<sync-timeseries,time series>>
+
Collection keys are generated using the `redis.key` configuration property which may contain `${topic}` (default) as a placeholder for the originating topic name.
+
For example with `redis.key = ${topic}` and topic `orders` the Redis key is `set:orders`.

* <<sync-hash,Hash>>, <<sync-string,string>>, <<sync-json,JSON>>
+
For other data-structures the key is in the form `<keyspace>:<record_key>` where `keyspace` is generated using the `redis.key` configuration property like above and `record_key` is the sink record key.
+
For example with `redis.key = ${topic}`, topic `orders`, and sink record key `123` the Redis key is `orders:123`.

[[sync-hash]]
==== Hash
Use the following properties to write Kafka records as Redis hashes:

Expand All @@ -62,8 +53,9 @@ value.converter=<Avro or JSON> <2>

<1> <<key-string,String>> or <<key-bytes,bytes>>
<2> <<avro,Avro>> or <<kafka-json,JSON>>.
If value is null the key is https://redis.io/commands/del[deleted].
If value is null the key is deleted.

[[sync-string]]
==== String
Use the following properties to write Kafka records as Redis strings:

Expand All @@ -76,8 +68,38 @@ value.converter=<string or bytes> <2>

<1> <<key-string,String>> or <<key-bytes,bytes>>
<2> <<value-string,String>> or <<value-bytes,bytes>>.
If value is null the key is https://redis.io/commands/del[deleted].
If value is null the key is deleted.

[[sync-json]]
==== JSON
Use the following properties to write Kafka records as RedisJSON documents:

[source,properties]
----
redis.type=JSON
key.converter=<string or bytes> <1>
value.converter=<string or bytes> <2>
----

<1> <<key-string,String>> or <<key-bytes,bytes>>
<2> <<value-string,String>> or <<value-bytes,bytes>>.
If value is null the key is deleted.

[[sync-stream]]
==== Stream
Use the following properties to store Kafka records as Redis stream messages:

[source,properties]
----
redis.type=STREAM
redis.key=<stream key> <1>
value.converter=<Avro or JSON> <2>
----

<1> <<collection-key,Stream key>>
<2> <<avro,Avro>> or <<kafka-json,JSON>>

[[sync-list]]
==== List
Use the following properties to add Kafka record keys to a Redis list:

Expand All @@ -96,6 +118,7 @@ redis.push.direction=<LEFT or RIGHT> <3>
The Kafka record value can be any format.
If a value is null then the member is removed from the list (instead of pushed to the list).

[[sync-set]]
==== Set
Use the following properties to add Kafka record keys to a Redis set:

Expand All @@ -112,6 +135,7 @@ key.converter=<string or bytes> <2>
The Kafka record value can be any format.
If a value is null then the member is removed from the set (instead of added to the set).

[[sync-zset]]
==== Sorted Set
Use the following properties to add Kafka record keys to a Redis sorted set:

Expand All @@ -128,22 +152,8 @@ key.converter=<string or bytes> <2>
The Kafka record value should be `float64` and is used for the score.
If the score is null then the member is removed from the sorted set (instead of added to the sorted set).

[[redisjson]]
==== JSON
Use the following properties to write Kafka records as RedisJSON documents:

[source,properties]
----
redis.type=JSON
key.converter=<string or bytes> <1>
value.converter=<string or bytes> <2>
----

<1> <<key-string,String>> or <<key-bytes,bytes>>
<2> <<value-string,String>> or <<value-bytes,bytes>>.
If value is null the key is https://redis.io/commands/del[deleted].

==== TimeSeries
[[sync-timeseries]]
==== Time Series

Use the following properties to write Kafka records as RedisTimeSeries samples:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,14 @@ public enum PushDirection {

public static final String KEY_CONFIG = "redis.key";
public static final String KEY_DEFAULT = TOKEN_TOPIC;
public static final String KEY_DOC = "A format string for the destination stream/set/zset/list key, which may contain '"
+ TOKEN_TOPIC + "' as a placeholder for the originating topic name.\nFor example, ``kafka_" + TOKEN_TOPIC
+ "`` for the topic 'orders' will map to the Redis key " + "'kafka_orders'.";
public static final String KEY_DOC = "A format string for destination key space, which may contain '" + TOKEN_TOPIC
+ "' as a placeholder for the originating topic name.\nFor example, ``kafka_" + TOKEN_TOPIC
+ "`` for the topic 'orders' will map to the Redis key space "
+ "'kafka_orders'.\nLeave empty for passthrough (only applicable to non-collection data structures).";

public static final String SEPARATOR_CONFIG = "redis.separator";
public static final String SEPARATOR_DEFAULT = ":";
public static final String SEPARATOR_DOC = "Separator for non-collection destination keys.";

public static final String MULTIEXEC_CONFIG = "redis.multiexec";
public static final String MULTIEXEC_DEFAULT = "false";
Expand Down Expand Up @@ -78,7 +83,8 @@ public enum PushDirection {

private final Charset charset;
private final DataType type;
private final String keyFormat;
private final String keyspace;
private final String separator;
private final PushDirection pushDirection;
private final boolean multiexec;
private final int waitReplicas;
Expand All @@ -89,7 +95,8 @@ public RedisEnterpriseSinkConfig(Map<?, ?> originals) {
String charsetName = getString(CHARSET_CONFIG).trim();
charset = Charset.forName(charsetName);
type = ConfigUtils.getEnum(DataType.class, this, TYPE_CONFIG);
keyFormat = getString(KEY_CONFIG).trim();
keyspace = getString(KEY_CONFIG).trim();
separator = getString(SEPARATOR_CONFIG).trim();
pushDirection = ConfigUtils.getEnum(PushDirection.class, this, PUSH_DIRECTION_CONFIG);
multiexec = Boolean.TRUE.equals(getBoolean(MULTIEXEC_CONFIG));
waitReplicas = getInt(WAIT_REPLICAS_CONFIG);
Expand All @@ -104,8 +111,12 @@ public DataType getType() {
return type;
}

public String getKeyFormat() {
return keyFormat;
public String getKeyspace() {
return keyspace;
}

public String getSeparator() {
return separator;
}

public PushDirection getPushDirection() {
Expand Down Expand Up @@ -138,10 +149,13 @@ public RedisEnterpriseSinkConfigDef(ConfigDef base) {
private void define() {
define(ConfigKeyBuilder.of(CHARSET_CONFIG, ConfigDef.Type.STRING).documentation(CHARSET_DOC)
.defaultValue(CHARSET_DEFAULT).importance(ConfigDef.Importance.HIGH).build());
define(ConfigKeyBuilder.of(TYPE_CONFIG, ConfigDef.Type.STRING).documentation(TYPE_DOC).defaultValue(TYPE_DEFAULT)
.importance(ConfigDef.Importance.HIGH).validator(Validators.validEnum(DataType.class)).build());
define(ConfigKeyBuilder.of(KEY_CONFIG, ConfigDef.Type.STRING).documentation(KEY_DOC).defaultValue(KEY_DEFAULT)
.importance(ConfigDef.Importance.MEDIUM).build());
define(ConfigKeyBuilder.of(TYPE_CONFIG, ConfigDef.Type.STRING).documentation(TYPE_DOC)
.defaultValue(TYPE_DEFAULT).importance(ConfigDef.Importance.HIGH)
.validator(Validators.validEnum(DataType.class)).build());
define(ConfigKeyBuilder.of(KEY_CONFIG, ConfigDef.Type.STRING).documentation(KEY_DOC)
.defaultValue(KEY_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
define(ConfigKeyBuilder.of(SEPARATOR_CONFIG, ConfigDef.Type.STRING).documentation(SEPARATOR_DOC)
.defaultValue(SEPARATOR_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
define(ConfigKeyBuilder.of(PUSH_DIRECTION_CONFIG, ConfigDef.Type.STRING).documentation(PUSH_DIRECTION_DOC)
.defaultValue(PUSH_DIRECTION_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
define(ConfigKeyBuilder.of(MULTIEXEC_CONFIG, ConfigDef.Type.BOOLEAN).documentation(MULTIEXEC_DOC)
Expand Down Expand Up @@ -186,7 +200,7 @@ public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result
+ Objects.hash(charset, keyFormat, multiexec, pushDirection, type, waitReplicas, waitTimeout);
+ Objects.hash(charset, keyspace, separator, multiexec, pushDirection, type, waitReplicas, waitTimeout);
return result;
}

Expand All @@ -199,9 +213,10 @@ public boolean equals(Object obj) {
if (getClass() != obj.getClass())
return false;
RedisEnterpriseSinkConfig other = (RedisEnterpriseSinkConfig) obj;
return Objects.equals(charset, other.charset) && Objects.equals(keyFormat, other.keyFormat)
&& multiexec == other.multiexec && pushDirection == other.pushDirection && type == other.type
&& waitReplicas == other.waitReplicas && waitTimeout == other.waitTimeout;
return Objects.equals(charset, other.charset) && Objects.equals(keyspace, other.keyspace)
&& Objects.equals(separator, other.separator) && multiexec == other.multiexec
&& pushDirection == other.pushDirection && type == other.type && waitReplicas == other.waitReplicas
&& waitTimeout == other.waitTimeout;
}

}
Loading

0 comments on commit fc99e65

Please sign in to comment.