Skip to content

Commit

Permalink
Merge pull request #129 from caraml-dev/change-kafka-topic-subscribe-…
Browse files Browse the repository at this point in the history
…pattern

feat: Set subscribe pattern to wildcard prefix
  • Loading branch information
shydefoo authored Jan 3, 2025
2 parents 190eec2 + 46b51a5 commit d80e503
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ object StreamingPipeline extends BasePipeline with Serializable {
sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", source.bootstrapServers)
.option("subscribe", source.topic)
.option("subscribePattern", s"^(.*\\.|)${source.topic}")
.option(
"kafka.metadata.max.age.ms",
"5000"
) // set max age to 5s to refresh for topics every 5s
.load()
case source: MemoryStreamingSource =>
source.read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import dev.caraml.spark.helpers.RedisStorageHelper.{
import dev.caraml.spark.helpers.TestRow
import dev.caraml.store.protobuf.types.ValueProto.ValueType
import org.apache.commons.codec.digest.DigestUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.avro.to_avro
Expand Down Expand Up @@ -129,8 +131,8 @@ class StreamingPipelineSpec extends SparkSpec with ForAllTestContainer {
val rows = generateDistinctRows(rowGenerator, 1000, groupByEntity)
rows.foreach(sendToKafka(kafkaSource.topic, _))

Thread.sleep(10000); // sleep for 10s to allow topic discovery
query.processAllAvailable()

rows.foreach { r =>
val encodedEntityKey = encodeEntityKey(r, config.featureTable)
val storedValues = jedis.hgetAll(encodedEntityKey).asScala.toMap
Expand All @@ -145,8 +147,36 @@ class StreamingPipelineSpec extends SparkSpec with ForAllTestContainer {
val keyTTL = jedis.ttl(encodedEntityKey).toInt
keyTTL shouldEqual -1
}

}

"Streaming pipeline" should "store valid proto messages from kafka to redis when reading from topics with prefix" in new Scope {
val configWithKafka = config.copy(source = kafkaSource)
val query = StreamingPipeline.createPipeline(sparkSession, configWithKafka).get
query.processAllAvailable() // to init kafka consumer

val rowsV2 = generateDistinctRows(rowGenerator, 10, groupByEntity)
rowsV2.foreach(sendToKafka(s"GCP.${kafkaSource.topic}", _))

Thread.sleep(10000); // sleep for 10s to allow topic discovery
query.processAllAvailable()
rowsV2.foreach { r =>
val encodedEntityKey = encodeEntityKey(r, config.featureTable)
val storedValues = jedis.hgetAll(encodedEntityKey).asScala.toMap
print(s"r2: ${r}, storedValues: ${storedValues}")
storedValues should beStoredRow(
Map(
featureKeyEncoder("unique_drivers") -> r.getUniqueDrivers,
murmurHashHexString("_ts:driver-fs") -> new java.sql.Timestamp(
r.getEventTimestamp.getSeconds * 1000
)
)
)
val keyTTL = jedis.ttl(encodedEntityKey).toInt
keyTTL shouldEqual -1
}

}
"Streaming pipeline" should "store messages from kafka to redis with expiry time equal to entity max age" in new Scope {
val maxAge = 86400L
val entityMaxAge = 1728000L
Expand Down Expand Up @@ -380,7 +410,7 @@ class StreamingPipelineSpec extends SparkSpec with ForAllTestContainer {
.option("kafka.bootstrap.servers", kafkaContainer.bootstrapServers)
.option("topic", "avro")
.save()

Thread.sleep(10000)
query.processAllAvailable()

val redisKey = DigestUtils.md5Hex(s"default#customer:aaa").getBytes()
Expand Down

0 comments on commit d80e503

Please sign in to comment.