Public | Automated Build

Last pushed: 4 months ago
Short Description
k8s contrib useful images
Full Description

Kubernetes Kafka K8SKafka

This project contains a Docker image meant to facilitate the deployment of
Apache Kafka on
Kubernetes using
StatefulSets.

Limitations

  1. Persistent Volumes must be used. emptyDirs will likely result in a loss of
    data.
  2. Storage media I/O isolation is not generally possible at this time. Consider
    using Pod Anti-Affinity rules to place noisy neighbors on separate Nodes.

Docker Image

The docker image contained in this repository is comprised of a base
Ubuntu 16.04 image using the latest release of the OpenJDK JRE based on the 1.8
JVM (JDK 8u111), the latest stable release of Kafka (10.2.0) using Scala 2.11.
Ubuntu is a much larger image than BusyBox or Alpine, but these images contain
mucl or ulibc. This requires a custom version of OpenJDK to be built against a
libc runtime other than glibc. While there are smaller Kafka images based
on Alpine and BusyBox, the interactions between Kafka, the JVM, and glibc are
better understood and easier to debug.

The image is built such that the Kafka JVM process is designated to run as a
non-root user. By default, this user is kafka and has UID 1000 and GID 1000.
The Kafka package is installed into the /opt/kafka directory, all
configuration is installed into /opt/kafka/config and all executables are in
/opt/kafka/bin. Due to the implementation of the scripts in /opt/kafka/bin, it
is not feasible to symbolically link them into the /user/bin directory. As such,
the /opt/kafka/bin directory is added to the PATH environment variable.

ZooKeeper

Kafka requires an installation of Apache Zookeeper for broker configuration
storage and coordination. A example of how to deploy a ZooKeeper ensemble on
Kubernetes can be found
here.
For testing purposes an ensemble of 1-3 servers is sufficient. For production
use, you should consider deploying at least 5 servers so that you can tolerate
the loss of one server during the planned maintenance of another. If you are
running ZooKeeper on Kubernetes, it is best to use a separate ensemble for each
Kafka cluster. For production use, you should ensure that each ZooKeeper server
has at least 2 GiB of heap with at least 4 GiB of reserved memory for the Pod.
As ZooKeeper is not particularly CPU intensive, 2 CPUs per server should be
sufficient for most use cases. If you are running Kubernetes on a Cloud
Provider (e.g. GCP, Azure, or AWS), you should provision a fast storage class
for the ZooKeeper PVs. As the PVs are backed by network attached storage, there
is little to be gained from isolating the write ahead log from the snapshots
directory.

Headless Service

The Kafka Stateful Set requires a Headless Service to control the network domain
for the Kafka brokers. The yaml below creates a Headless Service that allows
brokers to be discovered and exposes the 9093 port for client connections.

apiVersion: v1
kind: Service
metadata:
  name: kafka-svc
  labels:
    app: kafka
spec:
  ports:
  - port: 9093
    name: server
  clusterIP: None
  selector:
    app: kafka

StatefulSet

The Kafka StatefulSet deploys a configurable number of replicas on the
Kubernetes cluster. The StatefulSet serviceName must match the Headless Service
and specify the desired number of brokers.

apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: kafka
spec:
  serviceName: kafka-svc
  replicas: 3
  ...

Configuration

This section details the configuration of the Kafka cluster.

Broker Configuration

The configuration for each broker is generated by overriding the default
configuration with command line flags. The high and medium importance
configuration parameters form the
Kafka documentation.

 kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
           --override listeners=PLAINTEXT://:9093 \
           --override zookeeper.connect=zk-0.zk-svc.default.svc.cluster.local:2181,zk-1.zk-svc.default.svc.cluster.local:2181,zk-2.zk-svc.default.svc.cluster.local:2181 \
           --override auto.create.topics.enable=true \
           --override auto.leader.rebalance.enable=true \
           --override background.threads=10 \
           --override compression.type=producer \
           --override delete.topic.enable=false \
           --override leader.imbalance.check.interval.seconds=300 \
           --override leader.imbalance.per.broker.percentage=10 \
           --override log.flush.interval.messages=9223372036854775807 \
           --override log.flush.offset.checkpoint.interval.ms=60000 \
           --override log.flush.scheduler.interval.ms=9223372036854775807 \
           --override log.retention.bytes=-1 \
           --override log.retention.hours=168 \
           --override log.roll.hours=168 \
           --override log.roll.jitter.hours=0 \
           --override log.segment.bytes=1073741824 \
           --override log.segment.delete.delay.ms=60000 \
           --override message.max.bytes=1000012 \
           --override min.insync.replicas=1 \
           --override num.io.threads=8 \
           --override num.network.threads=3 \
           --override num.recovery.threads.per.data.dir=1 \
           --override num.replica.fetchers=1 \
           --override offset.metadata.max.bytes=4096 \
           --override offsets.commit.required.acks=-1 \
           --override offsets.commit.timeout.ms=5000 \
           --override offsets.load.buffer.size=5242880 \
           --override offsets.retention.check.interval.ms=600000 \
           --override offsets.retention.minutes=1440 \
           --override offsets.topic.compression.codec=0 \
           --override offsets.topic.num.partitions=50 \
           --override offsets.topic.replication.factor=3 \
           --override offsets.topic.segment.bytes=104857600 \
           --override queued.max.requests=500 \
           --override quota.consumer.default=9223372036854775807 \
           --override quota.producer.default=9223372036854775807 \
           --override replica.fetch.min.bytes=1 \
           --override replica.fetch.wait.max.ms=500 \
           --override replica.high.watermark.checkpoint.interval.ms=5000 \
           --override replica.lag.time.max.ms=10000 \
           --override replica.socket.receive.buffer.bytes=65536 \
           --override replica.socket.timeout.ms=30000 \
           --override request.timeout.ms=30000 \
           --override socket.receive.buffer.bytes=102400 \
           --override socket.request.max.bytes=104857600 \
           --override socket.send.buffer.bytes=102400 \
           --override unclean.leader.election.enable=true \
           --override zookeeper.session.timeout.ms=6000 \
           --override zookeeper.set.acl=false \
           --override broker.id.generation.enable=true \
           --override connections.max.idle.ms=600000 \
           --override controlled.shutdown.enable=true \
           --override controlled.shutdown.max.retries=3 \
           --override controlled.shutdown.retry.backoff.ms=5000 \
           --override controller.socket.timeout.ms=30000 \
           --override default.replication.factor=1 \
           --override fetch.purgatory.purge.interval.requests=1000 \
           --override group.max.session.timeout.ms=300000 \
           --override group.min.session.timeout.ms=6000 \
           --override inter.broker.protocol.version=0.10.2-IV0 \
           --override log.cleaner.backoff.ms=15000 \
           --override log.cleaner.dedupe.buffer.size=134217728 \
           --override log.cleaner.delete.retention.ms=86400000 \
           --override log.cleaner.enable=true \
           --override log.cleaner.io.buffer.load.factor=0.9 \
           --override log.cleaner.io.buffer.size=524288 \
           --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
           --override log.cleaner.min.cleanable.ratio=0.5 \
           --override log.cleaner.min.compaction.lag.ms=0 \
           --override log.cleaner.threads=1 \
           --override log.cleanup.policy=delete \
           --override log.index.interval.bytes=4096 \
           --override log.index.size.max.bytes=10485760 \
           --override log.message.timestamp.difference.max.ms=9223372036854775807 \
           --override log.message.timestamp.type=CreateTime \
           --override log.preallocate=false \
           --override log.retention.check.interval.ms=300000 \
           --override max.connections.per.ip=2147483647 \
           --override num.partitions=1 \
           --override producer.purgatory.purge.interval.requests=1000 \
           --override replica.fetch.backoff.ms=1000 \
           --override replica.fetch.max.bytes=1048576 \
           --override replica.fetch.response.max.bytes=10485760 \
           --override reserved.broker.max.id=1000
  • Note that the broker.id is extracted from the ordinal index of the
    StatefulSet's Pods.
  • The listeners configuration must specify the port indicated by the headless
    service (9093 in this case).
  • The zookeeper.connect string is a comma separated list of the host:port
    pairs of the ZooKeeper servers in the ensemble.

OS Image tuning

For production use, it is important to configure the base OS image to allow
for a sufficient number of file descriptors for your workload.

  • For each broker, (partition) * (partition_size/segment_size) determines
    the number of files the Broker will have open at any give time. You must
    ensure that this will not result in the Broker process dying because it has
    exhausted its allowable number of file descriptors.

CPU

Typical production Kafka broker deployments run on dual processor Xeon's with
multiple hardware threads per core. However, CPU is unlikely to be your
bottleneck. An 8 CPU deployment should be more than sufficient for good
performance. You should start by simulating your workload with 2-4 CPUs and
titrating up from there. It is highly unlikely that CPU will be the bottleneck
for your deployment.

Memory

Kafka utilizes the OS page cache heavily to buffer data. To fully understand
the interaction of Kafka and Linux containers you should read
this
and this.
In particular, its is important to understand the accounting and isolation
offered for the page cache for a mem cgroup.
If your primary concern is isolation and performance you should do the
following.

  • Determine the number of seconds of data you want to buffer t (time).
  • Determine the total write throughput of the deployment tp (storage/time).
  • tp * t gives the memory storage requirement that you should reserve. This
    should be set as the memory request for the container.

Disk

Disk throughput is the most common bottleneck that users encounter with Kafka.
Given that Persistent Volumes are backed by network attached storage, the
throughput is, in most cases, capped on a per Node basis without respect to the
number of Persistent Volumes that are attached to the Node. For instance, if
you are deploying Kafka onto a GKE or GCP based Kubernetes cluster, and if you
use the standard PD type, your maximum sustained per instance throughput is
120 MB/s (Write) and 180 MB/s (Read). If you have multiple applications, each
with a Persistent Volume mounted, these numbers represent the total achievable
throughput. If you find that you have contention you should consider using
Pod Anti-Affinity rules to ensure that noisy neighbors are not collocated on the
same Node.

Pod Affinity

The Kafka Pod in the StatefulSet's PodTemplateSpec contains a Pod Anti-Affinity
and a Pod Anti-Affinity rule.

    affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: "app"
                    operator: In
                    values: 
                    - kafka
              topologyKey: "kubernetes.io/hostname"
        podAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
             - weight: 1
               podAffinityTerm:
                 labelSelector:
                    matchExpressions:
                      - key: "app"
                        operator: In
                        values: 
                        - zk
                 topologyKey: "kubernetes.io/hostname"

The Pod Anti-Affinity rule ensures that two Kafka Broker's will never be
launched on the same Node. This isn't strictly necessary, but it helps provide
stronger availability garauntees in the presence of Node failure, and it helps
alleviate disk throughput bottlenecks.
The Pod Affinity rule attempts to collocate Kafka and ZooKeeper on the same
Nodes. You will likely have more Kafka brokers than ZooKeeper servers, but the
Kubernetes scheduler will attempt to, where possible, collocate Kafka brokers
and ZooKeeper servers while respecting the hard spreading enforced by the
Pod Anti-Affinity rule. This optimization attempts to minimize the amount of
network I/O between the ZooKeeper ensemble and the Kafka cluster. However, if
disk contention becomes an issue, it is equally valid to express a Pod
Anti-Affinity rule to ensure that ZooKeeper servers and Kafka brokers are not
scheduled onto the same node.

Testing

The easies way to test your deployment is to use the create a topic and use the
console producer and consumer.
Use kubectl exec to execute a bash shell on one of the brokers.

> kubectl exec -ti kafka-0 -- bash

From the command line create a topic using kafka-topics.sh

> kafka-topics.sh --create \
--topic test \
--zookeeper zk-0.zk-svc.default.svc.cluster.local:2181,zk-1.zk-svc.default.svc.cluster.local:2181,zk-2.zk-svc.default.svc.cluster.local:2181 \
--partitions 3 \
--replication-factor 2

Run the console consumer as below.

> kafka-console-consumer.sh --topic test --bootstrap-server localhost:9093

Use kubectl exec to execute a bash shell on another one of the brokers. You
can use the same broker, but using a different broker will demonstrate that
the system is working across multiple Nodes.

> kubectl exec -ti kafka-1 -- bash

Run the console producer and generate a few messages by typing into stdin.
Every time you press Enter you will flush a message to the consumer.

> kafka-console-producer.sh --topic test --broker-list localhost:9093
hello
I like kafka
goodbye

You will see the messages on the console in which the console consumer is
running.

> kafka-console-consumer.sh --topic test --bootstrap-server localhost:9093
hello
I like kafka
goodbye

Horizontal Scaling

You can use kubectl scale to horizontally scale your cluster. The below will
scale the number of brokers to two.

> kubectl scale statefulset kafka --replicas=2

You should note that, when you scale a Kafka cluster up or down you will have
to use kafka-reassign-partitions.sh to ensure that your data is correctly
replicated and assigned after scaling.

Docker Pull Command
Owner
jimme
Source Repository

Comments (0)