ontotext/kafka-sink-connector

By ontotext

Updated about 2 months ago

GraphDB’s Kafka Sink Connector automates smart updates to knowledge graphs with SPARQL templates.

Image

16

Kafka Sink Connector

Kafka Sink Connector for RDF update streaming to GraphDB. Implementation on top of Confluent Kafka Broker

For more information on the connector, please refer to the official documentation

For the current version of Apache Kafka in project is 3.8.x and the Kafka worker is 7.8.x. For compatibility matrix, please refer to the confluent official documentation. io/platform/current/installation/versions-interoperability.html#cp-and-apache-ak-compatibility)

Source: https://github.com/Ontotext-AD/kafka-sink-graphdb

Running the image standalone

To run the image standalone:

$ docker pull ontotext/kafka-sink-connector:$version # Pull $version - i.e. 3.0.0
$ docker run -itd --name connect -p 8083:8083 -p 5005:5005 \
      -e CONNECT_REST_ADVERTISED_HOST_NAME: connect-1 \
      -e CONNECT_BOOTSTRAP_SERVERS=<url to Confluent Kafka broker> \
      -e CONNECT_REST_PORT=8083 \
      -e CONNECT_GROUP_ID: group \
      -e CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter \
      -e CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter \
      -e CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter \
      -e CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter \
      -e CONNECT_CONFIG_STORAGE_TOPIC: connect-configs \
      -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 \
      -e CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 \
      -e CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets \
      -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 \
      -e CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status-1 \
      -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 \
      -e CONNECT_ZOOKEEPER_CONNECT: <url to ZooKeeper>

For more information on environment variables setup refer to official Confluent Kafka Connector configuration documentation.

Running the image with docker-compose

The connector can be run as part of a Docker compose setup where all components are configured together. A compose file is provided in the official repository

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.3
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - 2181:2181
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:7.5.3
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
      - 9092:9092
      - 19092:19092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://broker:9092,CONNECTIONS_FROM_HOST://localhost:19092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: anonymous

  graphdb:
    image: ontotext/graphdb:10.5.0
    hostname: graphdb
    container_name: graphdb
    ports:
      - 7200:7200

  connect-1:
    image: ontotext/kafka-sink-connector:3.0.0
    hostname: connect-1
    container_name: connect-1
    depends_on:
      - zookeeper
      - broker
      - graphdb
    ports:
      - 8083:8083
      - 5005:5005
    environment:
      # Uncomment the line below to configure remote JVM debug on port 5005
      # JAVA_TOOL_OPTIONS: -agentlib:jdwp=transport=dt_socket,address=*:5005,server=y,suspend=n
      CONNECT_BOOTSTRAP_SERVERS: broker:9092
      CONNECT_REST_ADVERTISED_HOST_NAME: connect-1
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group-1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs-1
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets-1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status-1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR

  connect-2:
    image: ontotext/kafka-sink-connector:3.0.0
    hostname: connect-2
    container_name: connect-2
    depends_on:
      - zookeeper
      - broker
      - graphdb
    ports:
      - 8084:8083
    environment:
      # Uncomment the line below to configure remote JVM debug on port 5005
      # JAVA_TOOL_OPTIONS: -agentlib:jdwp=transport=dt_socket,address=*:5006,server=y,suspend=n
      CONNECT_BOOTSTRAP_SERVERS: broker:9092
      CONNECT_REST_ADVERTISED_HOST_NAME: connect-2
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group-2
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs-2
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets-2
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status-2
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR

  connect-3:
    image: ontotext/kafka-sink-connector:3.0.0
    hostname: connect-3
    container_name: connect-3
    depends_on:
      - zookeeper
      - broker
      - graphdb
    ports:
      - 8085:8083
    environment:
      # Uncomment the line below to configure remote JVM debug on port 5005
      # JAVA_TOOL_OPTIONS: -agentlib:jdwp=transport=dt_socket,address=*:5007,server=y,suspend=n
      CONNECT_BOOTSTRAP_SERVERS: broker:9092
      CONNECT_REST_ADVERTISED_HOST_NAME: connect-3
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group-3
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs-3
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets-3
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status-3
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR

To run the sink connector, start it via docker-compose:

$ docker compose up 

This will start the connectors in distributed mode (i.e. 3 instances). If only a single instance is required:

$ docker compose up connect-1 broker graphdb zookeper

When ready, create a new repository in GDB instance, test:

$ curl 'http://localhost:7200/rest/repositories' \
                        -H 'Accept: application/json, text/plain, */*' \
                        -H 'Content-Type: application/json;charset=UTF-8' \
                        -d '{"id": "test", "params": {"imports": {"name": "imports", "label": "Imported RDF files('\'';'\'' delimited)", "value": ""}, "defaultNS": {"name": "defaultNS", "label": "Default namespaces for imports('\'';'\'' delimited)", "value": ""}}, "title": "", "type": "graphdb", "location": ""}'

The create a corresponding connector configuration:

curl -H 'Content-Type: application/json' --data '
                {
                        "name": "add-connector",
                        "config": {
                                "connector.class":"com.ontotext.kafka.GraphDBSinkConnector",
                                "key.converter": "com.ontotext.kafka.convert.DirectRDFConverter",
                                "value.converter": "com.ontotext.kafka.convert.DirectRDFConverter",
                                "value.converter.schemas.enable": "false",
                                "topics":"add",
                                "tasks.max":"1", "tasks.max" : "2",
                                "offset.storage.file.filename": "/tmp/storage",
                                "graphdb.server.url": "http://graphdb:7200",
                                "graphdb.server.repository": "test",
                                "graphdb.batch.size": 1000,
                                "graphdb.batch.commit.limit.ms": 1000,
                                "graphdb.auth.type": "NONE", "errors.tolerance" : "all",
                                "graphdb.update.type": "ADD",
                                "graphdb.update.rdf.format": "jsonld" }
                }' http://localhost:8083/connectors -w "\n"

Once the repository is created, you can leverage the broker to send new records in the configured topic or topics. For more information, see the Kafka Sink Connector documentation.

Docker Pull Command

docker pull ontotext/kafka-sink-connector