This program is to read a CSV file and write it to target location by applying a simple transformation on each record. In future projects I will integrate Kafka and Spark for more complex and heavy transformations.
This project can also serve as point of reference in addressing several other funcationalties like:
- Setting up a multi-broker Kafka cluster using script -
- Converting Avro type to .txt file using Kafka Consumer
- Terminate Kafka Consumer if no data is polled for x seconds
- Enable WebUI monitioring for Kafka topics and brokers
- Confluent Kafka 5.X.X
- Scala 2.11
- Gradle
- Intellij/ Eclipse IDE
Create new
files. One file for eack of the brokers.
cp $KAFKA_HOME/etc/kafka/ $KAFKA_HOME/etc/kafka/
cp $KAFKA_HOME/etc/kafka/ $KAFKA_HOME/etc/kafka/
cp $KAFKA_HOME/etc/kafka/ $KAFKA_HOME/etc/kafka/
Open $KAFKA_HOME/etc/kafka/
file and make the below changes
N.B : If $KAFKA_HOME does not parse properly, you can add absolute path to log.dirs
Open $KAFKA_HOME/etc/kafka/
file and make the below changes
Open $KAFKA_HOME/etc/kafka/
file and make the below changes
Open $KAFKA_HOME/etc/kafka/
file and make the below changes
Open $KAFKA_HOME/etc/confluent-control-center/
file and make the below changes
I'm using the community version of Confluent Kafka. That gives me 30 days trial period to use the Control Center. This project comes with a switch to turn off Kafka Producer and Consumer monitoring. To disable monitoring, make enable.monitoring=false
in properties.config file.
into project on all terminals you open.
Start zookeeper, brokers, schema-registry, control-center and create topics by running the script
Pavans-MacBook-Pro:KafkaFile_BatchProcessing pavanpkulkarni$ chmod +x Pavans-MacBook-Pro:KafkaFile_BatchProcessing pavanpkulkarni$ ./
Open a new tab to build the project
Pavans-MacBook-Pro:KafkaFile_BatchProcessing pavanpkulkarni$ gradle clean build
Open new tab for Consumer and run
scala -cp build/libs/KafkaFile_BatchProcessing-1.0-SNAPSHOT.jar com.pavanpkulkarni.consumer.ConsumerApp tt properties.config
general syntax : scala -cp build/libs/KafkaFile_BatchProcessing-1.0-SNAPSHOT.jar com.pavanpkulkarni.consumer.ConsumerApp <topic_name> <property_file_name>
Open new tab for Producer and run below command immediately
scala -cp build/libs/KafkaFile_BatchProcessing-1.0-SNAPSHOT.jar com.pavanpkulkarni.producer.ProducerApp tt properties.config
general syntax : scala -cp build/libs/KafkaFile_BatchProcessing-1.0-SNAPSHOT.jar com.pavanpkulkarni.producer.ProducerApp <topic_name> <property_file_name>
Go back to Consumer tab and check if you have received the messages from Producer.
Wait until counter reaches
for Consumer to terminate. -
Check output.txt file.
N.B: We start Consumer before the Producer. This way we allow the Consumer to join th e
and be ready to accept data from Producer.
User.avsc file defines the Avro schema for this project. I have used com.zlad.gradle.avrohugger
plugin to generate Scala based class files for Avro schema. A special mention to sourceFormat = SpecificRecord
in build.gradle which enables smooth conversion of .avsc file to Scala class.
Run gradle clean run
to generate Avro classes. Check build/generated-src/avro
directory to view the sytem generated Avro classes. Visit Gradle Plugins Page to read more about this.