Public Repository

Last pushed: 2 years ago
Short Description
Runs Kafka with some configuration options
Full Description

Run Commands:

start up zookeeper:

docker run -d -p 49181:2181 --name zookeeper jplock/zookeeper

start up first instance:

docker run -d --name kafka -p 9092:9092 -e BROKER_ID=1 -e HOST_IP=172.17.42.1 -e PORT=9092 --link zookeeper:zk levidehaan/kafka:0.8.1

start up second instance:

docker run -d --name kafka -p 9093:9093 -e BROKER_ID=2 -e HOST_IP=172.17.42.1 -e PORT=9093 --link zookeeper:zk levidehaan/kafka:0.8.1

Here is some more information on how to use zookeeper without linking

docker run -ti --name kafka -e BROKER_ID=1 -e HOST_IP=10.1.10.13 -e PORT=9092 -e ZK_PORT_2181_TCP_ADDR=10.1.10.105 levidehaan/kafka:0.8.1

the ZK_PORT_2181_TCP_ADDR is what you specify your ip
at the moment the start script doesn't contain a port setting for zk. so default is 2181.

here is the script that runs in the kafka container to start the processes:

sed -r -i "s/(zookeeper.connect)=(.*)/\1=$ZK_PORT_2181_TCP_ADDR/g" $KAFKA_HOME/config/server.properties 
sed -r -i "s/(broker.id)=(.*)/\1=$BROKER_ID/g" $KAFKA_HOME/config/server.properties 
sed -r -i "s/#(advertised.host.name)=(.*)/\1=$HOST_IP/g" $KAFKA_HOME/config/server.properties 
sed -r -i "s/^(port)=(.*)/\1=$PORT/g" $KAFKA_HOME/config/server.properties

To test this out with python you can use kafka-python install with:
Install kafka-python pip install kafka-python


Kafka Emitter:

from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer, KeyedProducer

kafka = KafkaClient("localhost:9093")

producer = SimpleProducer(kafka)
producer.send_messages("my_topic", "some message")
producer.send_messages("my_topic", "this method", "is variadic")

producer = SimpleProducer(kafka, async=True)
producer.send_messages("my_topic", "async message")

producer = SimpleProducer(kafka, async=False,
                        req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
                        ack_timeout=2000)

response = producer.send_messages("my_topic", "async message")

if response:
    print(response[0].error)
    print(response[0].offset)

producer = SimpleProducer(kafka, batch_send=True, batch_send_every_n=20, batch_send_every_t=60)

kafka.close()

Kafka Consumer:

from kafka.client import KafkaClient
from kafka.consumer import MultiProcessConsumer

kafka = KafkaClient("localhost:9093")

consumer = MultiProcessConsumer(kafka, "my-group", "my_topic", num_procs=2)

consumer = MultiProcessConsumer(kafka, "my-group", "my_topic",
                              partitions_per_proc=2)

print("Listening for Kafka")

for message in consumer:
    print(message)

for message in consumer.get_messages(count=5, block=True, timeout=4):
    print(message)

This container is based off http://wurstmeister.github.io/kafka-docker/

Docker Pull Command
Owner
levidehaan

Comments (0)