Skip to content

Commit

Permalink
Merge pull request #16 from coursehero/master
Browse files Browse the repository at this point in the history
Fixes #15
  • Loading branch information
jruaux authored Oct 11, 2022
2 parents 5cc626c + f2ae363 commit 029fd22
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.redis.kafka.connect.source;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -32,7 +32,7 @@ public class KeySourceRecordReader extends AbstractSourceRecordReader<DataStruct
private static final Schema STRING_VALUE_SCHEMA = Schema.STRING_SCHEMA;
private static final String HASH_VALUE_SCHEMA_NAME = "com.redis.kafka.connect.HashEventValue";
private static final Schema HASH_VALUE_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA)
.name(HASH_VALUE_SCHEMA_NAME);
.name(HASH_VALUE_SCHEMA_NAME).build();

private final int batchSize;
private final String topic;
Expand All @@ -41,6 +41,7 @@ public class KeySourceRecordReader extends AbstractSourceRecordReader<DataStruct
private AbstractRedisClient client;
private GenericObjectPool<StatefulConnection<String, String>> pool;
private StatefulRedisPubSubConnection<String, String> pubSubConnection;
Clock clock = Clock.systemDefaultZone();

public KeySourceRecordReader(RedisSourceConfig sourceConfig, Duration idleTimeout) {
super(sourceConfig);
Expand Down Expand Up @@ -99,7 +100,7 @@ protected SourceRecord convert(DataStructure<String> input) {
Map<String, ?> sourcePartition = new HashMap<>();
Map<String, ?> sourceOffset = new HashMap<>();
return new SourceRecord(sourcePartition, sourceOffset, topic, null, KEY_SCHEMA, input.getKey(), schema(input),
input.getValue(), Instant.now().getEpochSecond());
input.getValue(), clock.instant().toEpochMilli());
}

private Schema schema(DataStructure<String> input) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.redis.kafka.connect.source;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -33,14 +33,15 @@ public class StreamSourceRecordReader extends AbstractSourceRecordReader<StreamM
private static final Schema KEY_SCHEMA = Schema.STRING_SCHEMA;
private static final String VALUE_SCHEMA_NAME = "com.redis.kafka.connect.stream.Value";
private static final Schema VALUE_SCHEMA = SchemaBuilder.struct().field(FIELD_ID, Schema.STRING_SCHEMA)
.field(FIELD_BODY, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA))
.field(FIELD_STREAM, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME);
.field(FIELD_BODY, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).build())
.field(FIELD_STREAM, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME).build();
private final String topic;
private final String consumer;

private StreamItemReader<String, String> reader;
private AbstractRedisClient client;
private GenericObjectPool<StatefulConnection<String, String>> pool;
Clock clock = Clock.systemDefaultZone();

public StreamSourceRecordReader(RedisSourceConfig sourceConfig, int taskId) {
super(sourceConfig);
Expand Down Expand Up @@ -92,7 +93,7 @@ protected SourceRecord convert(StreamMessage<String, String> message) {
Struct value = new Struct(VALUE_SCHEMA).put(FIELD_ID, message.getId()).put(FIELD_BODY, message.getBody())
.put(FIELD_STREAM, message.getStream());
return new SourceRecord(sourcePartition, sourceOffset, topic, null, KEY_SCHEMA, key, VALUE_SCHEMA, value,
Instant.now().getEpochSecond());
clock.instant().toEpochMilli());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package com.redis.kafka.connect.source;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;

import io.lettuce.core.StreamMessage;

class StreamSourceRecordReaderTest {

public static final String OFFSET_FIELD = "offset";
public static final String FIELD_ID = "id";
public static final String FIELD_BODY = "body";
public static final String FIELD_STREAM = "stream";
private static final String VALUE_SCHEMA_NAME = "com.redis.kafka.connect.stream.Value";
private static final long NOW = System.currentTimeMillis();
private static final Clock CLOCK = Clock.fixed(Instant.ofEpochMilli(NOW), ZoneId.systemDefault());

// This is published, so if it's changed, it may impact users.
private static final Schema PUBLISHED_SCHEMA =
SchemaBuilder.struct().field(FIELD_ID, Schema.STRING_SCHEMA)
.field(FIELD_BODY, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).build())
.field(FIELD_STREAM, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME).build();

@ParameterizedTest
@ArgumentsSource(ConvertArgs.class)
void testConvertStreamMessageOfStringString(ConvertArgs args) {
final StreamSourceRecordReader r = new StreamSourceRecordReader(args.config, 0);
r.clock = CLOCK;

final SourceRecord got = r.convert(args.message);

assertThat(got, equalTo(args.want));
}

static class ConvertArgs implements ArgumentsProvider, Arguments {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) throws Exception {
return Stream.of(
new ConvertArgs(
new RedisSourceConfig(mapOf("redis.stream.name", "stream1")),
new StreamMessage<>("stream1", "1-0", mapOf()),
new SourceRecord(
mapOf(),
mapOf("offset", "1-0"),
"stream1",
null,
Schema.STRING_SCHEMA,
"1-0",
PUBLISHED_SCHEMA,
new Struct(PUBLISHED_SCHEMA)
.put(FIELD_ID, "1-0")
.put(FIELD_STREAM, "stream1")
.put(FIELD_BODY, mapOf()),
NOW)),

new ConvertArgs(
new RedisSourceConfig(mapOf("redis.stream.name", "stream2")),
new StreamMessage<>("stream2", "2-0", mapOf("key2", "value2")),
new SourceRecord(
mapOf(),
mapOf("offset", "2-0"),
"stream2",
null,
Schema.STRING_SCHEMA,
"2-0",
PUBLISHED_SCHEMA,
new Struct(PUBLISHED_SCHEMA)
.put(FIELD_ID, "2-0")
.put(FIELD_STREAM, "stream2")
.put(FIELD_BODY, mapOf("key2", "value2")),
NOW)),

new ConvertArgs(
new RedisSourceConfig(
mapOf(
"redis.stream.name",
"stream3",
"topic",
"topic3")),
new StreamMessage<>("stream3", "3-0", mapOf("key3", "value3")),
new SourceRecord(
mapOf(),
mapOf("offset", "3-0"),
"topic3",
null,
Schema.STRING_SCHEMA,
"3-0",
PUBLISHED_SCHEMA,
new Struct(PUBLISHED_SCHEMA)
.put(FIELD_ID, "3-0")
.put(FIELD_STREAM, "stream3")
.put(FIELD_BODY, mapOf("key3", "value3")),
NOW))
);
}

RedisSourceConfig config;
StreamMessage<String, String> message;
SourceRecord want;

ConvertArgs() {
}

ConvertArgs(RedisSourceConfig config, StreamMessage<String, String> message, SourceRecord want) {
this.config = config;
this.message = message;
this.want = want;
}

@Override
public Object[] get() {
return new Object[] { this };
}
}

static Map<String, String> mapOf(String... args) {
final HashMap<String, String> ret = new HashMap<>();
int i = 0;
for (; i < args.length; i+=2) {
ret.put(args[i], args[i+1]);
}
if (i != args.length) {
throw new IllegalArgumentException("Expects an even number of arguments");
}
return ret;
}

}

0 comments on commit 029fd22

Please sign in to comment.