Public Repository

Last pushed: a year ago
Short Description
export data from Apache Kafka to Apache Cassandra
Full Description

What is kafka2cassandra?

It is an Alpine image with a Golang program using gocql and samara drivers which allows you to export data from Apache Kafka to Apache Cassandra.
The keyspace's table which will contain the information you want to import must have tree colums: id (timeuuid), utctime (timestamp) or sequence (int), and data (text); PRIMARY KEY (id, utctime). If the table does not exist, it will be autocreated.
Every readed message from Kafka consists of one or more literals separated by commas, for example: one,two,three four five,six_seven ,?¿eigth,2015-01-01 15:24:08.212-0000. One of these literals can be a timestamp, if you want to use it on the utctime colum, you can specify it by the enviroment variables. Otherwise, the timestamp will be auto generated. Also, it can be created a not global sequence instead of the timestamp.


  • Works with Apache Cassandra 3.x and Apache Kafka >= 0.8
  • It prints state messages in stderr
  • Very easy to configure through environment variables
  • Auto discover kafka peers from DNS name
  • Consumer group support
  • Customizable initial offset for topic consuming
  • Waits for keyspace to be created
  • Keyspace's table auto create
  • Waits for kafka to be ready
  • Waits for topic to be created
  • Auto reconnect and retry in case of error

How to use this image


$ docker run --name some-kafka2cassandra --env CLUSTER="" --env KEYSPACE="example" --env TABLE="imported" --env DATEPOSSITION="4" puyi/kafka2cassandra

IMPORTANT: if you do not set up the required enviroment variables, this image will not work correctly

Required Enviroment Variables


This variable is for controlling which Cassandra IP address to connect to.


This variable is for controlling which Keyspace you want to use. It must to be created in advance.


This variable is for controlling which Keyspace's Table you want to use to store the imported information. It must have tree colums: id (timeuuid), utctime (timestamp) or sequence (int), and data (text); PRIMARY KEY (id, utctime). It will be created if does not exist.

Others Enviroment Variables


Indicates which field in the imported data is the timestamp. The default value is 0, which means that there is no timestamp in the data and it will be autogenerated. If it is equal to -1, it means that timestamp is not required and instead it will be created a NOT global sequential sequence: 1,2,3,4...


The DNS name for input Kafka broker service. The default value is "kafka".


Port to connect to input Kafka peers. The default value is 9092


The topic to consume. The default value is "mytopic"


The kafka consumer group ID. The default value is "kafka-console-consumer"


The offset to start with in new topic. It can be "oldest" or "newest". The default value is "newest"


The buffer size channel which writes the Kafka menssages. The default value is 256

Timestamp format

yyyy-mm-dd HH:mm:ss.sss-Z

Where Z is the 4-digit time zone, expressing the time zone's difference from UTC
Verify the timestamp is correct, otherwise it will be ignored

Related work


Docker Pull Command