Skip to content

Latest commit

 

History

History
227 lines (171 loc) · 9.21 KB

File metadata and controls

227 lines (171 loc) · 9.21 KB

Overview

Welcome to your kafka demo workshop. In this demo we will try to cover the basics of kafka and will create an streaming elt pipline using ksqlDB.

Kafka Demo Overview

Kafka

  • Apache Kafka is an distributed event streaming platform optimized for ingesting and processing streaming data in real-time.
  • LinkedIn developed Kafka in 2011 as a high-throughput message broker for its own use, then open-sourced and donated Kafka to the Apache Software Foundation

Kafka Terminology

Term Description
Events An event represents a fact that happened in the past. Events are immutable
Streams Events in motion
Topics Topic is a materialized event stream
Partitions Partition is the smallest storage unit that holds a subset of records owned by a topic. Each partition is a single log file where records are written to it in an append-only fashion
Offsets The records in the partitions are each assigned a sequential identifier called the offset. Messages within a partition are ordered, messages across a topic are not guaranteed to be ordered
Broker A broker is an instance (server, computer, container) running Kakfa process. Manage partitions, handles read write requests, manage replication of partitions
Zookeeper Kafka uses ZooKeeper to store its metadata. Data such as the location of partitions and the configuration of topics are stored outside of Kafka in ZooKeeper
Producers Producers are processes that push records into Kafka topics
Consumers Consumers are processes that pulls records into Kafka topics
Replication factor The replication factor controls how many servers will replicate each message that is written. If you have a replication factor of 3, then up to 2 servers can fail before you will lose access to your data
Controller Kafka elects one broker in the cluster to be the controller. The controller is responsible for cluster management and handles events like broker failures, leader election, topic deletion and more.

Kafka Components

Kafka Components

Demo

Start docker containers

🛑 Please stop any existing containers before proceeding to next steps 🛑

  • Clone this repo

  • Open an new terminal and cd into kafka-postgres-superset directory. This directory contains infra components for this demo

  • Create a copy of .env.template as .env

  • Start Kafka containers by running

    docker-compose up --remove-orphans -d --build
  • Validate the container by running

    docker ps
  • Navigate to Control Center in your browser

  • Navigate to pgAdmin in your browser and login with admin@admin.com and postgres

  • Click on Add New Server and give following details

    • Name : kafka demo
    • Host Name : postgres
    • User : postgres
    • Password : postgres
  • Navigate to Superset in your browser and login with admin and admin

⚠️ If you change the credentials in .env the please make to use them ⚠️

Kafka Broker

Validate the Kafka Broker by running

docker exec -it kafkacat kafkacat -b broker:9092 -L

Kafka Producer

  • Produce sample messages to demo topic by running
docker exec -it kafkacat kafkacat -b broker:9092 -t demo -P -K~
1~Iron Man
2~Ant Man
3~Spider Man
4~Avengers
  • We will produce more message using Kafka connector and Datagen.
{
  "name": "datagen-users",
  "config": {
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "kafka.topic": "users",
    "quickstart": "users",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schemas.enable": "false",
    "max.interval": 30000,
    "iterations": -1,
    "tasks.max": "1"
  }
}

Kafka Stream Processsing (ksqlDB)

  • Start ksqlDB’s CLI using the command docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

  • Create stream process by running below sql

SET 'auto.offset.reset' = 'earliest';

-- Create RAW Stream
CREATE
	OR REPLACE STREAM STM_USER_0010_RAW (ROWKEY VARCHAR KEY)
	WITH (
			KAFKA_TOPIC = 'users'
			,VALUE_FORMAT = 'AVRO'
			);

-- Create table to count users
CREATE
	OR REPLACE TABLE TBL_USER_0020_GENDER_COUNT AS

SELECT GENDER, AS_VALUE(GENDER) AS GENDER_KEY, COUNT(GENDER) AS GENDER_COUNT
FROM STM_USER_0010_RAW
GROUP BY GENDER EMIT CHANGES;

Kafka Consumer

  • Consume messages from demo topic by running
docker exec -it kafkacat kafkacat -b broker:9092 -t demo -C -o beginning

docker exec -it kafkacat kafkacat -b broker:9092 -t demo -C -o 3

docker exec -it kafkacat kafkacat -b broker:9092 -t demo -o beginning -f 'Topic %t[%p], offset: %o, key: %k, payload: %S bytes: %s\n'

docker exec -it kafkacat kafkacat -b broker:9092 -t demo -o 3 -f 'Topic %t[%p], offset: %o, key: %k, payload: %S bytes: %s\n'
  • We will consume message using Kafka connector
{
  "name": "postgres-sink-users-gender",
  "config": {
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schemas.enable": "false",
    "topics": "TBL_USER_0020_GENDER_COUNT",
    "connection.url": "jdbc:postgresql://postgres:5432/",
    "connection.user": "postgres",
    "connection.password": "postgres",
    "insert.mode": "upsert",
    "delete.enabled": true,
    "table.name.format": "demo.users_gender_count",
    "pk.mode": "record_key",
    "pk.fields": "gender_key",
    "auto.create": "true"
  }
}

Create Superset Dashboard

  • Create database postgresql://postgres:postgres@postgres:5432/postgres
  • Create dataset
  • Create chart and add to dashboard

Sample Dashboard

Clean demo resources

  • Exit ksqlDB cli by running exit

  • Open an new terminal and cd into kafka-postgres-superset directory.

  • Run below command to delete the docker containers and related volumes docker-compose down --volumes --remove-orphans

  • Delete exited containers by running docker rm $(docker ps -q -f status=exited)

Reference

Notes

  • dos2unix find . -type f -print0 | xargs -0 -n 1 -P 4 dos2unix
  • git git config --global core.autocrlf true