Public Repository

Last pushed: 3 months ago
Short Description
export data from Apache Kafka to Apache Cassandra
Full Description

What is kafka2cassandra?

It is an Alpine image with a Golang program using gocql and samara drivers which allows you to export data from Apache Kafka to Apache Cassandra.
The keyspace's table which will contain the information you want to import must have tree colums: id (timeuuid), utctime (timestamp) or sequence (int), and data (text); PRIMARY KEY (id, utctime). If the table does not exist, it will be autocreated.
Every readed message from Kafka consists of one or more literals separated by commas, for example: one,two,three four five,six_seven ,?¿eigth,2015-01-01 15:24:08.212-0000. One of these literals can be a timestamp, if you want to use it on the utctime colum, you can specify it by the enviroment variables. Otherwise, the timestamp will be auto generated. Also, it can be created a not global sequence instead of the timestamp.

Features

  • Works with Apache Cassandra 3.x and Apache Kafka >= 0.8
  • It prints state messages in stderr
  • Very easy to configure through environment variables
  • Auto discover kafka peers from DNS name
  • Consumer group support
  • Customizable initial offset for topic consuming
  • Waits for keyspace to be created
  • Keyspace's table auto create
  • Waits for kafka to be ready
  • Waits for topic to be created
  • Auto reconnect and retry in case of error

How to use this image

example:

$ docker run --name some-kafka2cassandra --env CLUSTER="172.16.56.183" --env KEYSPACE="example" --env TABLE="imported" --env DATEPOSSITION="4" puyi/kafka2cassandra

IMPORTANT: if you do not set up the required enviroment variables, this image will not work correctly

Required Enviroment Variables

CLUSTER

This variable is for controlling which Cassandra IP address to connect to.

KEYSPACE

This variable is for controlling which Keyspace you want to use. It must to be created in advance.

TABLE

This variable is for controlling which Keyspace's Table you want to use to store the imported information. It must have tree colums: id (timeuuid), utctime (timestamp) or sequence (int), and data (text); PRIMARY KEY (id, utctime). It will be created if does not exist.

Others Enviroment Variables

DATEPOSITION

Indicates which field in the imported data is the timestamp. The default value is 0, which means that there is no timestamp in the data and it will be autogenerated. If it is equal to -1, it means that timestamp is not required and instead it will be created a NOT global sequential sequence: 1,2,3,4...

KAFKA_SERVICE

The DNS name for input Kafka broker service. The default value is "kafka".

KAFKA_PORT

Port to connect to input Kafka peers. The default value is 9092

TOPIC

The topic to consume. The default value is "mytopic"

KAFKA_GROUP

The kafka consumer group ID. The default value is "kafka-console-consumer"

OFFSET

The offset to start with in new topic. It can be "oldest" or "newest". The default value is "newest"

BUFFER_SIZE

The buffer size channel which writes the Kafka menssages. The default value is 256

Timestamp format

yyyy-mm-dd HH:mm:ss.sss-Z

Where Z is the 4-digit time zone, expressing the time zone's difference from UTC
Verify the timestamp is correct, otherwise it will be ignored

Related work

puyi/impcassandra
puyi/expcassandra
puyi/cassandra2kafka
kafka-console-producer
kafka-concole-consumer

Docker Pull Command
Owner
puyi

Comments (0)