ontotext/kafka-sink-connector
GraphDB’s Kafka Sink Connector automates smart updates to knowledge graphs with SPARQL templates.
16
Kafka Sink Connector for RDF update streaming to GraphDB. Implementation on top of Confluent Kafka Broker
For more information on the connector, please refer to the official documentation
For the current version of Apache Kafka in project is 3.8.x and the Kafka worker is 7.8.x. For compatibility matrix, please refer to the confluent official documentation. io/platform/current/installation/versions-interoperability.html#cp-and-apache-ak-compatibility)
Source: https://github.com/Ontotext-AD/kafka-sink-graphdb
To run the image standalone:
$ docker pull ontotext/kafka-sink-connector:$version # Pull $version - i.e. 3.0.0
$ docker run -itd --name connect -p 8083:8083 -p 5005:5005 \
-e CONNECT_REST_ADVERTISED_HOST_NAME: connect-1 \
-e CONNECT_BOOTSTRAP_SERVERS=<url to Confluent Kafka broker> \
-e CONNECT_REST_PORT=8083 \
-e CONNECT_GROUP_ID: group \
-e CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter \
-e CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter \
-e CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter \
-e CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter \
-e CONNECT_CONFIG_STORAGE_TOPIC: connect-configs \
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 \
-e CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 \
-e CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets \
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 \
-e CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status-1 \
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 \
-e CONNECT_ZOOKEEPER_CONNECT: <url to ZooKeeper>
For more information on environment variables setup refer to official Confluent Kafka Connector configuration documentation.
The connector can be run as part of a Docker compose setup where all components are configured together. A compose file is provided in the official repository
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.3
hostname: zookeeper
container_name: zookeeper
ports:
- 2181:2181
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-enterprise-kafka:7.5.3
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- 29092:29092
- 9092:9092
- 19092:19092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://broker:9092,CONNECTIONS_FROM_HOST://localhost:19092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: anonymous
graphdb:
image: ontotext/graphdb:10.5.0
hostname: graphdb
container_name: graphdb
ports:
- 7200:7200
connect-1:
image: ontotext/kafka-sink-connector:3.0.0
hostname: connect-1
container_name: connect-1
depends_on:
- zookeeper
- broker
- graphdb
ports:
- 8083:8083
- 5005:5005
environment:
# Uncomment the line below to configure remote JVM debug on port 5005
# JAVA_TOOL_OPTIONS: -agentlib:jdwp=transport=dt_socket,address=*:5005,server=y,suspend=n
CONNECT_BOOTSTRAP_SERVERS: broker:9092
CONNECT_REST_ADVERTISED_HOST_NAME: connect-1
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group-1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs-1
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets-1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status-1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
connect-2:
image: ontotext/kafka-sink-connector:3.0.0
hostname: connect-2
container_name: connect-2
depends_on:
- zookeeper
- broker
- graphdb
ports:
- 8084:8083
environment:
# Uncomment the line below to configure remote JVM debug on port 5005
# JAVA_TOOL_OPTIONS: -agentlib:jdwp=transport=dt_socket,address=*:5006,server=y,suspend=n
CONNECT_BOOTSTRAP_SERVERS: broker:9092
CONNECT_REST_ADVERTISED_HOST_NAME: connect-2
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group-2
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs-2
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets-2
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status-2
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
connect-3:
image: ontotext/kafka-sink-connector:3.0.0
hostname: connect-3
container_name: connect-3
depends_on:
- zookeeper
- broker
- graphdb
ports:
- 8085:8083
environment:
# Uncomment the line below to configure remote JVM debug on port 5005
# JAVA_TOOL_OPTIONS: -agentlib:jdwp=transport=dt_socket,address=*:5007,server=y,suspend=n
CONNECT_BOOTSTRAP_SERVERS: broker:9092
CONNECT_REST_ADVERTISED_HOST_NAME: connect-3
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group-3
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs-3
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets-3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status-3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
To run the sink connector, start it via docker-compose
:
$ docker compose up
This will start the connectors in distributed mode (i.e. 3 instances). If only a single instance is required:
$ docker compose up connect-1 broker graphdb zookeper
When ready, create a new repository in GDB instance, test
:
$ curl 'http://localhost:7200/rest/repositories' \
-H 'Accept: application/json, text/plain, */*' \
-H 'Content-Type: application/json;charset=UTF-8' \
-d '{"id": "test", "params": {"imports": {"name": "imports", "label": "Imported RDF files('\'';'\'' delimited)", "value": ""}, "defaultNS": {"name": "defaultNS", "label": "Default namespaces for imports('\'';'\'' delimited)", "value": ""}}, "title": "", "type": "graphdb", "location": ""}'
The create a corresponding connector configuration:
curl -H 'Content-Type: application/json' --data '
{
"name": "add-connector",
"config": {
"connector.class":"com.ontotext.kafka.GraphDBSinkConnector",
"key.converter": "com.ontotext.kafka.convert.DirectRDFConverter",
"value.converter": "com.ontotext.kafka.convert.DirectRDFConverter",
"value.converter.schemas.enable": "false",
"topics":"add",
"tasks.max":"1", "tasks.max" : "2",
"offset.storage.file.filename": "/tmp/storage",
"graphdb.server.url": "http://graphdb:7200",
"graphdb.server.repository": "test",
"graphdb.batch.size": 1000,
"graphdb.batch.commit.limit.ms": 1000,
"graphdb.auth.type": "NONE", "errors.tolerance" : "all",
"graphdb.update.type": "ADD",
"graphdb.update.rdf.format": "jsonld" }
}' http://localhost:8083/connectors -w "\n"
Once the repository is created, you can leverage the broker
to send new records in the configured topic or topics. For more information, see the Kafka Sink Connector documentation.
docker pull ontotext/kafka-sink-connector