Skip to content

ryarnyah/kafka-offset

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Kafka-Offset Build Status

Kafka metrics offset fetcher with some sinks :D

Installation

Binaries

sudo curl -L /~https://github.com/ryarnyah/kafka-offset/releases/download/0.8.4/kafka-offset-linux-amd64 -o /usr/local/bin/kafka-offset && sudo chmod +x /usr/local/bin/kafka-offset

Via Go

$ go get github.com/ryarnyah/kafka-offset

From Source

$ mkdir -p $GOPATH/src/github.com/ryarnyah
$ git clone /~https://github.com/ryarnyah/kafka-offset $GOPATH/src/github.com/ryarnyah/kafka-offset
$ cd !$
$ make

Running with Docker

docker run ryarnyah/kafka-offset-linux-amd64:0.8.4 <option>

Usage

 _  __      __ _                ___   __  __          _
| |/ /__ _ / _| | ____ _       / _ \ / _|/ _|___  ___| |_
| ' // _` | |_| |/ / _` |_____| | | | |_| |_/ __|/ _ \ __|
| . \ (_| |  _|   < (_| |_____| |_| |  _|  _\__ \  __/ |_
|_|\_\__,_|_| |_|\_\__,_|      \___/|_| |_| |___/\___|\__|

 Scrape kafka metrics and send them to some sinks!
 Version: 0.8.4
 Build: 7a702fe-dirty

  -collectd-hostname string
    	Hostname for collectd plugin
  -collectd-interval string
    	Collectd interval
  -elasticsearch-password string
    	Elasticsearch password
  -elasticsearch-sink-index string
    	Elasticsearch index name (default "metrics")
  -elasticsearch-sink-url string
    	Elasticsearch sink URL (default "http://localhost:9200")
  -elasticsearch-username string
    	Elasticsearch username
  -influxdb-addr string
    	Hostname of influxdb (default "http://localhost:8086")
  -influxdb-database string
    	Influxdb database (default "metrics")
  -influxdb-password string
    	Influxdb user password
  -influxdb-retention-policy string
    	Influxdb retention policy (default "default")
  -influxdb-username string
    	Influxdb username
  -kafka-sink-brokers string
    	Kafka sink brokers (default "localhost:9092")
  -kafka-sink-sasl-password string
    	Kafka SASL password
  -kafka-sink-sasl-username string
    	Kafka SASL username
  -kafka-sink-ssl-cacerts string
    	Kafka SSL cacerts
  -kafka-sink-ssl-cert string
    	Kafka SSL cert
  -kafka-sink-ssl-insecure
    	Kafka insecure ssl connection
  -kafka-sink-ssl-key string
    	Kafka SSL key
  -kafka-sink-topic string
    	Kafka topic to send metrics (default "metrics")
  -kafka-sink-version string
    	Kafka sink broker version (default "0.10.2.0")
  -log-level string
    	Log level (default "info")
  -plugin-cmd string
    	Command to launch the plugin with arguments (ex: /usr/local/bin/my-plugin --test)
  -plugin-name string
    	Plugin type to use. Only kafka_grpc is supported by now. (default "kafka_grpc")
  -plugin-tls-ca-file string
    	TLS CA file (client trust)
  -plugin-tls-cert-file string
    	TLS certificate file (client trust)
  -plugin-tls-cert-key-file string
    	TLS certificate key file (client trust)
  -plugin-tls-insecure
    	Check TLS certificate against CA File
  -profile string
    	Profile to apply to log (default "prod")
  -profiling-enable
    	Enable profiling
  -profiling-host string
    	HTTP profiling host:port (default "localhost:6060")
  -sink string
    	Sink to use (log, kafka, elasticsearch, collectd, plugin) (default "log")
  -sink-produce-interval duration
    	Time beetween metrics production (default 1m0s)
  -source-brokers string
    	Kafka source brokers (default "localhost:9092")
  -source-kafka-version string
    	Kafka source broker version (default "0.10.2.0")
  -source-sasl-password string
    	Kafka SASL password
  -source-sasl-username string
    	Kafka SASL username
  -source-scrape-interval duration
    	Time beetween scrape kafka metrics (default 1m0s)
  -source-ssl-cacerts string
    	Kafka SSL cacerts
  -source-ssl-cert string
    	Kafka SSL cert
  -source-ssl-insecure
    	Kafka insecure ssl connection
  -source-ssl-key string
    	Kafka SSL key
  -version
    	Print version

Supported metrics

Name Exposed informations
kafka_topic_partition Number of partitions for this Topic
kafka_topic_partition_offset_newest Newest offset for this Topic/Partition
kafka_topic_partition_offset_oldest Oldest offset for this Topic/Partition
kafka_leader_topic_partition NodeID leader for this Topic/Partition
kafka_replicas_topic_partition Number of replicas for this Topic/Partition
kafka_in_sync_replicas Number of In-Sync Replicas for this Topic/Partition
kafka_leader_is_preferred_topic_partition 1 if Topic/Partition is using the Preferred Broker
kafka_under_replicated_topic_partition 1 if Topic/Partition is under Replicated
kafka_consumer_group_lag Lag for this Consumer group / Topic
kafka_consumer_group_latest_offset Latest offset for this Consumer group
kafka_topic_rate Rate of Topic production
kafka_consumer_group_rate Rate of this Consumer group consumer on this Topic

About

Supported Sinks

Log (-sink log)

Simple log sink with glog

Example
docker run ryarnyah/kafka-offset-linux-amd64:0.8.4 -sink log -source-brokers localhost:9092

Kafka (-sink kafka)

Kafka sink export metrics as JSON format to specified topic. SASL/SSL supported.

Example
docker run ryarnyah/kafka-offset-linux-amd64:0.8.4 -sink kafka -source-brokers localhost:9092 -kafka-sink-brokers localhost:9092 -kafka-sink-topic metrics

Elasticsearch (-sink elasticsearch)

Elasticsearch V6 sink export metrics as documents to specified index. Auth supported.

Example
docker run ryarnyah/kafka-offset-linux-amd64:0.8.4 -sink elasticsearch -source-brokers localhost:9092 -elasticsearch-sink-url localhost:9200 -elasticsearch-sink-index metrics

Collectd (-sink collectd)

Collectd Exec plugin sink (see deploy/collectd/types.db)

Example
TypesDB "/opt/collectd/share/collectd/kafka/types.db.custom"
<Plugin exec>
	Exec nobody "/bin/sh" "-c" "/usr/local/bin/kafka-offset -sink collectd -source-brokers localhost:9092 -log-level panic -source-scrape-interval 10s -sink-produce-interval 10s"
</Plugin>
Install collectd types
sudo mkdir -p /opt/collectd/share/collectd/kafka/
sudo curl -L https://raw.githubusercontent.com/ryarnyah/kafka-offset/master/deploy/collectd/types.db -o /opt/collectd/share/collectd/kafka/types.db.custom

InfluxDB

InfluxDB sink export metrics to specified database (must exist) with default retention.

Example
docker run ryarnyah/kafka-offset-linux-amd64:0.8.4 -sink influxdb -source-brokers localhost:9092 -influxdb-addr http://localhost:8086

Plugin

Plugin sink to implement your own go-plugin sink.

Example (log sink implemented with plugin)
package main

import (
        "github.com/hashicorp/go-plugin"
        "github.com/ryarnyah/kafka-offset/pkg/sinks/plugin/shared"
        "github.com/sirupsen/logrus"
)

type StdoutSink struct{}

func (s StdoutSink) WriteKafkaMetrics(m []interface{}) error {
	for _, metric := range m {
		switch metric := metric.(type) {
		case metrics.KafkaMeter:
			s.kafkaMeter(metric)
		case metrics.KafkaGauge:
			s.kafkaGauge(metric)
		}
	}
    return nil
}

func (StdoutSink) kafkaMeter(metric metrics.KafkaMeter) {
	logrus.Infof("offsetMetrics %+v", metric)
}
func (StdoutSink) kafkaGauge(metric metrics.KafkaGauge) {
	logrus.Infof("consumerGroupOffsetMetrics %+v", metric)
}

func main() {
    plugin.Serve(&plugin.ServeConfig{
            HandshakeConfig: shared.Handshake,
            Plugins: map[string]plugin.Plugin{
                    "kafka_grpc": &shared.KafkaGRPCPlugin{Impl: &StdoutSink{}},
            },
            GRPCServer: plugin.DefaultGRPCServer,
    })
}

To run it:

docker run ryarnyah/kafka-offset-linux-amd64:0.8.4 -sink plugin -source-brokers localhost:9092 -plugin-cmd "/plugins/my-plugin arg1 arg2"