Public | Automated Build

Last pushed: 11 days ago
Short Description
Kafka Connect image with all Debezium connectors, and part of the Debezium platform.
Full Description

Kafka Connect is a system for moving data into and out of Kafka. All Debezium connectors adhere to the Kafka Connector API for source connectors, and each monitors a specific kind of database management system for changing data, and then forwards those changes directly into Kafka topics organized by server, database, and table. This image defines a runnable Kafka Connect service preconfigured with all Debezium connectors. The service has a RESTful API for managing connector instances -- simply start up a container, configure a connector for each data source you want to monitor, and let Debezium monitor those sources for changes and forward them to the appropriate Kafka topics.

What is Debezium?

Debezium is a distributed platform that turns your existing databases into event streams, so applications can quickly react to each row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely.

Running Debezium involves Zookeeper, Kafka, and services that run Debezium's connectors. For simple evaluation and experimentation, all services can all be run on a single host machine, using the recipe outlined below. Production environments, however, require properly running and networking multiple instances of each service to provide the performance, reliability, replication, and fault tolerance. This can be done with a platform like OpenShift that manages multiple Docker containers running on multiple hosts and machines. But running Kafka in a Docker container has limitations, so for scenarios where very high throughput is required, you should run Kafka on dedicated hardware as explained in the Kafka documentation.

How to use this image

This image can be used in several different ways. All require an already-running Zookeeper service, which is either running locally via the container named zookeeper or with OpenShift running as a service named zookeeper. Also required are already-running Kafka brokers, which are either running locally via the container named kafka or with OpenShift running as a service named kafka.

Start a Kafka Connect service instance

When running a cluster of one or more Kafka Connect service instances, several important parameters must be defined using environment variables. Please see the section below for the list of these required environment variables and acceptable values.

Starting an instance of Kafka Connect using this image is simple:

$ docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3  -d'/' | cut -f1 -d':') --link zookeeper:zookeeper --link kafka:kafka debezium/connect

This command uses this image and starts a new container named connect, which runs in the foreground and attaches the console so that it display the service's output and error messages. It exposes its REST API on port 8083, which is mapped to the same port number on the local host. It uses Zookeeper in the container (or service) named zookeeper and Kafka brokers in the container (or service) named kafka. This command sets the three required environment variables, though you should replace their values with more meaningful values for your environment.

To start the container in detached mode, simply replace the -it option with -d. No service output will not be sent to your console, but it can be read at any time using the docker logs command. For example, the following command will display the output and keep following the output:

$ docker logs --follow --name connect

Start a shell in a running container

If you are already running a container with a Kafka Connect service, you can use this image to connect to that container and obtain a command line shell:

$ docker exec -it connect bash

where connect is the name of your existing container. The shell will be set up with all environment variables exactly like when starting the service in the container. Therefore, links to other containers and additional environment variables may be specified and will be reflected in the shell's exported variables.

Environment variables

The Debezium Kafka image uses several environment variables when running a Kafka broker using this image.

GROUP_ID

This environment variable is required when running the Kafka Connect service. Set this to an ID that uniquely identifies the Kafka Connect cluster the service and its workers belong to.

CONFIG_STORAGE_TOPIC

This environment variable is required when running the Kafka Connect service. Set this to the name of the Kafka topic where the Kafka Connect services in the group store connector configurations. The topic must have a single partition and be highly replicated (e.g., 3x or more).

OFFSET_STORAGE_TOPIC

This environment variable is required when running the Kafka Connect service. Set this to the name of the Kafka topic where the Kafka Connect services in the group store connector offsets. The topic must have a single partition and be highly replicated (e.g., 3x or more).

BOOTSTRAP_SERVERS

This environment variable is an advanced setting, used only when Kafka is not running in a linkable container or service. Set this to a list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Once a connection is established to one of these brokers, the service will then discover and make use of all Kafka brokers in the cluster, regardless of which servers are specified here for bootstrapping. The list should be in the form host1:port1,host2:port2,.... We recommend that you include more than one broker in this list, in case one of those is down.

HOST_NAME

This environment variable is an advanced setting. Set this to the hostname that the the REST API will bind to. Defaults to the hostname of the container.

ADVERTISED_HOST_NAME

This environment variable is an advanced setting. Set this to the hostname that will be given out to other workers to connect with. Defaults to the hostname of the container.

KEY_CONVERTER

This environment variable is an advanced setting. Set this to the fully-qualified name of the Java class that implements Kafka Connect's Converter class, used to convert the connector's keys to the form stored in Kafka. Defaults to org.apache.kafka.connect.json.JsonConverter.

VALUE_CONVERTER

This environment variable is an advanced setting. Set this to the fully-qualified name of the Java class that implements Kafka Connect's Converter class, used to convert the connector's values to the form stored in Kafka. Defaults to org.apache.kafka.connect.json.JsonConverter.

INTERNAL_KEY_CONVERTER

This environment variable is an advanced setting. Set this to the fully-qualified name of the Java class that implements Kafka Connect's Converter class, used to convert the connector offset and configuration keys to the form stored in Kafka. Defaults to org.apache.kafka.connect.json.JsonConverter.

INTERNAL_VALUE_CONVERTER

This environment variable is an advanced setting. Set this to the fully-qualified name of the Java class that implements Kafka Connect's Converter class, used to convert the connector offset and configuration values to the form stored in Kafka. Defaults to org.apache.kafka.connect.json.JsonConverter.

OFFSET_FLUSH_INTERVAL_MS

This environment variable is an advanced setting. Set this to the number of milliseconds defining the interval at which the service will periodically try committing offsets for tasks. The default is 60000, or 60 seconds.

OFFSET_FLUSH_TIMEOUT_MS

This environment variable is an advanced setting. Set this to the maximum time in milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt. The default is 5000, or 5 seconds.

SHUTDOWN_TIMEOUT

This environment variable is an advanced setting. Set this to the number of milliseconds to wait for tasks to shutdown gracefully while the connectors complete all processing, record any final data, and clean up resources. This is the total amount of time, not per task. All task have shutdown triggered, then they are waited on sequentially. The default is 10000, or 10 seconds.

HEAP_OPTS

This environment variable is recommended. Use this to set the JVM options for the Kafka broker. By default a value of '-Xmx1G -Xms1G' is used, meaning that each Kafka broker uses 1GB of memory. Using too little memory may cause performance problems, while using too much may prevent the broker from starting properly given the memory available on the machine. Obviously the container must be able to use the amount of memory defined by this environment variable.

LOG_LEVEL

This environment variable is optional. Use this to set the level of detail for Kafka's application log written to STDOUT and STDERR. Valid values are INFO (default), WARN, ERROR, DEBUG, or TRACE."

Others

Environment variables that start with CONNECT_ will be used to update the Kafka Connect worker configuration file. Each environment variable name will be mapped to a configuration property name by:

  1. removing the CONNECT_ prefix;
  2. lowercasing all characters; and
  3. converting all '_' characters to '.' characters

For example, the environment variable CONNECT__HEARTBEAT_INTERVAL_MS is converted to the heartbeat.interval.ms property. The container will then update the Kafka Connect worker configuration file to include the property's name and value.

The value of the environment variable may not contain a '\@' character.

Ports

Containers created using this image will expose port 8083, which is the standard port bound to by the Kafka Connect service. You can use standard Docker options to map this to a different port on the host that runs the container.

Storing data

The Kafka Connect service run by this image stores no data in th econtainer, but it does produce logs. The only way to keep these files is to use volumes that map specific directories inside the container to the local file system (or to OpenShift persistent volumes).

Log files

Although this image will send Kafka Connect service log output to standard output so it is visible as Docker logs, this image also configures the Kafka Connect service to write out more logs to a data volume at /kafka/logs. All logs are rotated daily.

Configuration

This image defines a data volume at /kafka/config where the broker's configuration files are stored. Note that these configuration files are always modified based upon the environment variables and linked containers. The best use of this data volume is to be able to see the configuration files used by Kafka, although with some care it is possible to supply custom configuration files that will be adapted and used upon startup.

Docker Pull Command
Owner
debezium
Source Repository

Comments (1)
jeffyzhang
a year ago

The docker image debezium/connect seems not work as the tutorial description. I followed the tutorial and start the container but it always not show the logs expected and REST API also cannot used.
Here the log:

Using the following environment variables:
GROUP_ID=1
CONFIG_STORAGE_TOPIC=my-connect-configs
OFFSET_STORAGE_TOPIC=my-connect-offsets
BOOTSTRAP_SERVERS=172.17.0.7:9092
REST_HOST_NAME=172.17.0.8
REST_PORT=8083
ADVERTISED_HOST_NAME=10.1.234.191
ADVERTISED_PORT=8083
KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
OFFSET_FLUSH_INTERVAL_MS=60000
OFFSET_FLUSH_TIMEOUT_MS=
SHUTDOWN_TIMEOUT=10000
......
2016-07-21 11:55:46,547 WARN || The configuration config.storage.topic = my-connect-configs was supplied but isn't a known config. [org.apache.kafka.clients.consumer.ConsumerConfig]
2016-07-21 11:55:46,548 WARN || The configuration rest.advertised.host.name = 10.1.234.191 was supplied but isn't a known config. [org.apache.kafka.clients.consumer.ConsumerConfig]
2016-07-21 11:55:46,548 WARN || The configuration rest.host.name = 172.17.0.8 was supplied but isn't a known config. [org.apache.kafka.clients.consumer.ConsumerConfig]
2016-07-21 11:55:46,548 WARN || The configuration rest.advertised.port = 8083 was supplied but isn't a known config. [org.apache.kafka.clients.consumer.ConsumerConfig]
2016-07-21 11:55:46,548 WARN || The configuration task.shutdown.graceful.timeout.ms = 10000 was supplied but isn't a known config. [org.apache.kafka.clients.consumer.ConsumerConfig]
2016-07-21 11:55:46,548 WARN || The configuration offset.flush.timeout.ms = 5000 was supplied but isn't a known config. [org.apache.kafka.clients.consumer.ConsumerConfig]
2016-07-21 11:55:46,548 WARN || The configuration internal.key.converter.schemas.enable = false was supplied but isn't a known config. [org.apache.kafka.clients.consumer.ConsumerConfig]
2016-07-21 11:55:46,548 WARN || The configuration offset.flush.interval.ms = 60000 was supplied but isn't a known config. [org.apache.kafka.clients.consumer.ConsumerConfig]
2016-07-21 11:55:46,548 WARN || The configuration rest.port = 8083 was supplied but isn't a known config. [org.apache.kafka.clients.consumer.ConsumerConfig]
2016-07-21 11:55:46,548 WARN || The configuration internal.key.converter = org.apache.kafka.connect.json.JsonConverter was supplied but isn't a known config. [org.apache.kafka.clients.consumer.ConsumerConfig]
2016-07-21 11:55:46,548 WARN || The configuration key.converter.schemas.enable = true was supplied but isn't a known config. [org.apache.kafka.clients.consumer.ConsumerConfig]
2016-07-21 11:55:46,548 WARN || The configuration internal.value.converter.schemas.enable = false was supplied but isn't a known config. [org.apache.kafka.clients.consumer.ConsumerConfig]
2016-07-21 11:55:46,548 WARN || The configuration value.converter.schemas.enable = true was supplied but isn't a known config. [org.apache.kafka.clients.consumer.ConsumerConfig]
2016-07-21 11:55:46,549 WARN || The configuration internal.value.converter = org.apache.kafka.connect.json.JsonConverter was supplied but isn't a known config. [org.apache.kafka.clients.consumer.ConsumerConfig]
2016-07-21 11:55:46,549 WARN || The configuration offset.storage.topic = my-connect-offsets was supplied but isn't a known config. [org.apache.kafka.clients.consumer.ConsumerConfig]
2016-07-21 11:55:46,549 WARN || The configuration value.converter = org.apache.kafka.connect.json.JsonConverter was supplied but isn't a known config. [org.apache.kafka.clients.consumer.ConsumerConfig]
2016-07-21 11:55:46,549 WARN || The configuration key.converter = org.apache.kafka.connect.json.JsonConverter was supplied but isn't a known config. [org.apache.kafka.clients.consumer.ConsumerConfig]
2016-07-21 11:55:46,549 INFO || Kafka version : 0.9.0.1 [org.apache.kafka.common.utils.AppInfoParser]
2016-07-21 11:55:46,549 INFO || Kafka commitId : 23c69d62a0cabf06 [org.apache.kafka.common.utils.AppInfoParser]
Exception in thread "main" org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata