Apache Kafka sink connector

Use the Apache Kafka® Connector to send data to Kafka. If you are looking for information about how to create a connection to get data into Decodable from Kafka, see Apache Kafka source connector in the Connect to a data source chapter.

Features

Delivery guarantee

at least once

Compatibility

Most Kafka 2.x and 3.x-compatible broker clusters including:

Create a connection to Kafka with the Apache Kafka Connector

If you want to use the Decodable CLI or API to create the connection, you can refer to the Property Name column for information about what the underlying property names are. The connector name is kafka.
  1. From the Connections page, select the Apache Kafka Streaming Connector and complete the following fields.

    UI Field

    Property Name

    Description

    Connection Type

    N/A

    Select sink for Decodable to send data to the Kafka topics provided.

    Bootstrap Servers

    bootstrap.servers

    A comma-separated list of your Kafka brokers. You must enter at least one broker. Enter each broker in the format: <host>:<port>. The broker list can’t contain any spaces.

    Topic

    topic

    The name of the topic to send data to.

    Broker Security Protocol

    security.protocol

    Specify the security protocol to use when connecting to a broker. Must be one of the following:

    • PLAINTEXT

    • TLS

    • SASL_SSL

    • SASL_PLAINTEXT.

    See the Security section for more information about these protocols and for the additional fields that you’ll need to fill out if you want to secure your connection with TLS or SASL Authentication.

    Value Format

    value.format

    The format for data in the Kafka topic. Must be one of the following:

    • JSON: Read and write JSON data based on a JSON schema.

    • RAW: Read and write raw (byte-based) values as a single column.

    • AVRO: Read and write Avro-formatted data based on an Avro schema. You can also use Avro with a schema registry. See the Avro section for more information.

    • Debezium (JSON): A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option if you want to send CDC data through this connector.

  2. Select which stream contains the records that you’d like to send to Kafka. Then, select Next.

  3. Give the newly created connection a Name and Description and select Save.

Security

Decodable supports four different security protocols for accessing Kafka brokers.

The Kafka connector (and Kafka) actually implements TLS which supersedes the SSL protocol. For historical reasons, Kafka calls this SSL rather than TLS even though the implementation is TLS. We do the same in order to avoid confusion for experienced Kafka users. That said, for the security-minded audience, it’s TLS!
Security protocol Description

SASL-authenticated

Username and password authentication is used with SASL. Both SSL/TLS and PLAINTEXT encryption is supported, as well as SCRAM and PLAIN authentication mechanisms.

mTLS-authenticated

Two-way SSL authentication is used, so that Decodable and the Kafka brokers authenticate each other using the SSL protocol. Additionally, the connection is encrypted using SSL.

TLS-authenticated

One-way SSL authentication is used. The client (Decodable) holds the server’s (Kafka brokers) public certificate. Data from the Kafka brokers is encrypted using the server’s private key and Decodable can decrypt it using the public certificate. Data from Decodable is encrypted using the public certificate and can only be decrypted using the Kafka broker’s private key.

Unauthenticated

No authentication takes place between Decodable and the Kafka brokers. The connection isn’t encrypted.

See the following pages for specific instructions on how to create a connection to Kafka using the various security protocols:

SASL properties

When configuring a Kafka connection to use SASL-authentication, the following connection properties are used. See How To: Set up SASL Authentication with Apache Kafka for an example.

UI Field

Property Name

Description

SASL Mechanism

sasl.mechanism

Specify the SASL mechanism as configured by the Kafka broker. Valid values are:

  • PLAIN

  • SCRAM-SHA-256

  • SCRAM-SHA-512

SASL Username

sasl.username

The provided username or API key for authentication.

SASL Password

sasl.password

The ID of the secret resource containing the password. If you are using the Decodable CLI, this is the ID of a secret resource in your account. Run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret.

Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Roles, groups, and permissions for more information."

TLS properties

When configuring a Kafka connection to use mTLS or TLS, the following connection properties are used:

UI Field

Property Name

Description

Broker Certificate

tls.broker.certificate

The broker’s public certificate used to encrypt traffic between external Kafka brokers and Decodable. This property is only used when security.protocol=TLS.

TLS Client Certificate

tls.client.certificate

Specify the client certificate used by Decodable to connect to external Kafka brokers with mTLS.

TLS Client Key

tls.client.key

The ID of the secret resource containing the client key. If you are using the Decodable CLI, this is the ID of a secret resource in your account. Run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret.

Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Roles, groups, and permissions for more information."

N/A

tls.broker.signed_client_certificate

Specify the client certificate signed by the broker. This property is required when tls.client.certificate.type=CSR.

Avro

A popular choice for data serialization on Kafka is Apache Avro. Decodable supports both plain Avro, as well as Avro with a schema registry.

Plain Avro

In order to use plain Avro, the connection property format=avro is required. In this mode, the Avro schema is generated from the connection schema, and is available on the inferred_schema connection property. For example:

decodable connection get 69caa7e1
avro_test_source
 id 69caa7e1
 description A Kafka source connection with plain Avro schema
 connector kafka
 type source
 stream id 865e8555
 schema
 0 field1 INT NOT NULL
 1 field2 DOUBLE NOT NULL
 2 field3 STRING NOT NULL
 properties
 bootstrap.servers kafka-kafka-brokers.kafka.svc.cluster.local:9092
 format avro
 inferred_schema {"type":"record","name":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"double"},{"name":"field3","type":"string"}]}
 topic avro_topic
 target state RUNNING
 actual state RUNNING
 create time 2022-02-04T19:05:19Z
 update time 2022-02-08T17:04:46Z

Avro with Schema Registry

To use Avro with a schema registry, the connection property format=confluent-avro is required. In this mode, the Avro schema is still derived from the connection schema, but it’s validated and registered against the configured schema registry.

When using this mode, the following properties are used:

Property Required? Description

avro-confluent.schema-registry.url

required

The URL of the schema registry to use.

avro-confluent.basic-auth.credentials-source

optional

The source of the authentication credentials to use, if required. Allowed values are [USER_INFO, SASL_INHERIT]. Typically USER_INFO is used most often when schema registry authentication is enabled.

avro-confluent.basic-auth.user-info

optional

The authentication credentials to use, if credentials are to be used. They must be specified in the form of username:password. Note that the credentials are kept secret, once set, and won’t be visible in the connection properties output.

avro-confluent.long-schema-id

optional

Whether to parse the schema ID as an integer or a long. If set to true, this will allow for reading and writing Apicurio-style payloads rather than the Confluent default. Defaults to false

For example, you can create a source Kafka connection using both SASL Authentication, as well as a secured schema registry using:

decodable connection create \
--name avro_registry_source \
--description "A Kafka source connection with SASL auth and a schema registry" \
--type source \
--stream-id 295e2a7f \
--connector kafka \
--prop format="avro-confluent" \
--prop topic="my-topic" \
--prop bootstrap.servers="some.broker.cloud:9092" \
--prop security.protocol=SASL_SSL \
--prop sasl.username="my-api-key" \
--prop sasl.password="my-secret-key" \
--prop sasl.mechanism="PLAIN" \
--prop avro-confluent.schema-registry.url="https://my.schema.registry.cloud" \
--prop avro-confluent.basic-auth.user-info="my-registry-user:my-registry-password" \
--prop avro-confluent.basic-auth.credentials-source="USER_INFO "

Connector starting state and offsets

A new sink connection will start reading from the Latest point in the source Decodable stream. This means that only data that’s written to the stream when the connection has started will be sent to the external system. You can override this when you start the connection to Earliest if you want to send all the existing data on the source stream to the target system, along with all new data that arrives on the stream.

When you restart a sink connection it will continue to read data from the point it most recently stored in the checkpoint before the connection stopped. You can also opt to discard the connection’s state and restart it afresh from Earliest or Latest as described above.

Learn more about starting state here.