Skip to content

Commit

Permalink
Fixes #15
Browse files Browse the repository at this point in the history
Kafka message timestamps are in milliseconds, but the SourceRecordReader
implementations in this package were using seconds. This just changes
them to use milliseconds instead.
  • Loading branch information
ahawtho committed Oct 7, 2022
1 parent 5cc626c commit f2ae363
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.redis.kafka.connect.source;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -32,7 +32,7 @@ public class KeySourceRecordReader extends AbstractSourceRecordReader<DataStruct
private static final Schema STRING_VALUE_SCHEMA = Schema.STRING_SCHEMA;
private static final String HASH_VALUE_SCHEMA_NAME = "com.redis.kafka.connect.HashEventValue";
private static final Schema HASH_VALUE_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA)
.name(HASH_VALUE_SCHEMA_NAME);
.name(HASH_VALUE_SCHEMA_NAME).build();

private final int batchSize;
private final String topic;
Expand All @@ -41,6 +41,7 @@ public class KeySourceRecordReader extends AbstractSourceRecordReader<DataStruct
private AbstractRedisClient client;
private GenericObjectPool<StatefulConnection<String, String>> pool;
private StatefulRedisPubSubConnection<String, String> pubSubConnection;
Clock clock = Clock.systemDefaultZone();

public KeySourceRecordReader(RedisSourceConfig sourceConfig, Duration idleTimeout) {
super(sourceConfig);
Expand Down Expand Up @@ -99,7 +100,7 @@ protected SourceRecord convert(DataStructure<String> input) {
Map<String, ?> sourcePartition = new HashMap<>();
Map<String, ?> sourceOffset = new HashMap<>();
return new SourceRecord(sourcePartition, sourceOffset, topic, null, KEY_SCHEMA, input.getKey(), schema(input),
input.getValue(), Instant.now().getEpochSecond());
input.getValue(), clock.instant().toEpochMilli());
}

private Schema schema(DataStructure<String> input) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.redis.kafka.connect.source;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -33,14 +33,15 @@ public class StreamSourceRecordReader extends AbstractSourceRecordReader<StreamM
private static final Schema KEY_SCHEMA = Schema.STRING_SCHEMA;
private static final String VALUE_SCHEMA_NAME = "com.redis.kafka.connect.stream.Value";
private static final Schema VALUE_SCHEMA = SchemaBuilder.struct().field(FIELD_ID, Schema.STRING_SCHEMA)
.field(FIELD_BODY, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA))
.field(FIELD_STREAM, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME);
.field(FIELD_BODY, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).build())
.field(FIELD_STREAM, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME).build();
private final String topic;
private final String consumer;

private StreamItemReader<String, String> reader;
private AbstractRedisClient client;
private GenericObjectPool<StatefulConnection<String, String>> pool;
Clock clock = Clock.systemDefaultZone();

public StreamSourceRecordReader(RedisSourceConfig sourceConfig, int taskId) {
super(sourceConfig);
Expand Down Expand Up @@ -92,7 +93,7 @@ protected SourceRecord convert(StreamMessage<String, String> message) {
Struct value = new Struct(VALUE_SCHEMA).put(FIELD_ID, message.getId()).put(FIELD_BODY, message.getBody())
.put(FIELD_STREAM, message.getStream());
return new SourceRecord(sourcePartition, sourceOffset, topic, null, KEY_SCHEMA, key, VALUE_SCHEMA, value,
Instant.now().getEpochSecond());
clock.instant().toEpochMilli());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package com.redis.kafka.connect.source;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;

import io.lettuce.core.StreamMessage;

class StreamSourceRecordReaderTest {

public static final String OFFSET_FIELD = "offset";
public static final String FIELD_ID = "id";
public static final String FIELD_BODY = "body";
public static final String FIELD_STREAM = "stream";
private static final String VALUE_SCHEMA_NAME = "com.redis.kafka.connect.stream.Value";
private static final long NOW = System.currentTimeMillis();
private static final Clock CLOCK = Clock.fixed(Instant.ofEpochMilli(NOW), ZoneId.systemDefault());

// This is published, so if it's changed, it may impact users.
private static final Schema PUBLISHED_SCHEMA =
SchemaBuilder.struct().field(FIELD_ID, Schema.STRING_SCHEMA)
.field(FIELD_BODY, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).build())
.field(FIELD_STREAM, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME).build();

@ParameterizedTest
@ArgumentsSource(ConvertArgs.class)
void testConvertStreamMessageOfStringString(ConvertArgs args) {
final StreamSourceRecordReader r = new StreamSourceRecordReader(args.config, 0);
r.clock = CLOCK;

final SourceRecord got = r.convert(args.message);

assertThat(got, equalTo(args.want));
}

static class ConvertArgs implements ArgumentsProvider, Arguments {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) throws Exception {
return Stream.of(
new ConvertArgs(
new RedisSourceConfig(mapOf("redis.stream.name", "stream1")),
new StreamMessage<>("stream1", "1-0", mapOf()),
new SourceRecord(
mapOf(),
mapOf("offset", "1-0"),
"stream1",
null,
Schema.STRING_SCHEMA,
"1-0",
PUBLISHED_SCHEMA,
new Struct(PUBLISHED_SCHEMA)
.put(FIELD_ID, "1-0")
.put(FIELD_STREAM, "stream1")
.put(FIELD_BODY, mapOf()),
NOW)),

new ConvertArgs(
new RedisSourceConfig(mapOf("redis.stream.name", "stream2")),
new StreamMessage<>("stream2", "2-0", mapOf("key2", "value2")),
new SourceRecord(
mapOf(),
mapOf("offset", "2-0"),
"stream2",
null,
Schema.STRING_SCHEMA,
"2-0",
PUBLISHED_SCHEMA,
new Struct(PUBLISHED_SCHEMA)
.put(FIELD_ID, "2-0")
.put(FIELD_STREAM, "stream2")
.put(FIELD_BODY, mapOf("key2", "value2")),
NOW)),

new ConvertArgs(
new RedisSourceConfig(
mapOf(
"redis.stream.name",
"stream3",
"topic",
"topic3")),
new StreamMessage<>("stream3", "3-0", mapOf("key3", "value3")),
new SourceRecord(
mapOf(),
mapOf("offset", "3-0"),
"topic3",
null,
Schema.STRING_SCHEMA,
"3-0",
PUBLISHED_SCHEMA,
new Struct(PUBLISHED_SCHEMA)
.put(FIELD_ID, "3-0")
.put(FIELD_STREAM, "stream3")
.put(FIELD_BODY, mapOf("key3", "value3")),
NOW))
);
}

RedisSourceConfig config;
StreamMessage<String, String> message;
SourceRecord want;

ConvertArgs() {
}

ConvertArgs(RedisSourceConfig config, StreamMessage<String, String> message, SourceRecord want) {
this.config = config;
this.message = message;
this.want = want;
}

@Override
public Object[] get() {
return new Object[] { this };
}
}

static Map<String, String> mapOf(String... args) {
final HashMap<String, String> ret = new HashMap<>();
int i = 0;
for (; i < args.length; i+=2) {
ret.put(args[i], args[i+1]);
}
if (i != args.length) {
throw new IllegalArgumentException("Expects an even number of arguments");
}
return ret;
}

}

0 comments on commit f2ae363

Please sign in to comment.