Apache Kafka sink connector

Features

Connector name

kafka

At least once

Supported task sizes

S, M, L

A single instance of this connector can write to multiple topics.

Compatibility

Most Kafka 2.x and 3.x-compatible broker clusters including:

Supported stream types

Although the Kafka connector supports both types of stream, a single Kafka connection may only use one type of stream. If you want to send data from both change streams and append streams, then you must create separate Kafka connections.

Configuration properties

Property Description Required Default

Connection

bootstrap.servers

A comma-separated list of your Kafka brokers. You must enter at least one broker.

Enter each broker in the format: <host>:<port>.

The broker list can’t contain any spaces.

Yes

security.protocol

Specify the security protocol to use when connecting to a broker.

Must be one of the following:

  • PLAINTEXT

  • TLS

  • SASL_SSL

  • SASL_PLAINTEXT

See the Security protocols section for more information about these protocols. For the additional fields that you’ll need to fill out if you want to secure your connection with TLS or SASL Authentication.

PLAINTEXT

Data

key.format

The format for the key, if present, in the Kafka message.

Must be one of:

  • avro

  • json

  • debezium-json

See Serialization formats for more details.

Only required when the input streams contain either a primary key or a partition key.

value.format

The format for the value payload in the Kafka message.

Must be one of:

  • avro

  • json

  • debezium-json

See Serialization formats for more details.

value.fields-include

Where primary/partition key fields should appear in the output messages.

  • ALL: the message key and value

  • EXCEPT_KEY: the message key only

See the Field Inclusion Policy section for more details on setting this property.

ALL

Security: SASL

sasl.mechanism

Specify the SASL mechanism as configured by the Kafka broker. Valid values are:

  • PLAIN

  • SCRAM-SHA-256

  • SCRAM-SHA-512

Yes

sasl.username

The username or API key for authentication.

Yes

sasl.password

The secret associated with your provided API key.

This must be provided as a secret resource.

Yes

Security: TLS

tls.broker.certificate

The public certificate for the broker used to encrypt traffic to Decodable.

Yes

tls.client.key

The secret associated with the client TLS key used by mTLS connections. The key must be an unencrypted key in PKCS#8 format.

This must be provided as a secret resource.

Only required for mTLS

tls.client.certificate

The client certificate signed by the broker.

You must use the Decodable CLI to specify the client certificate. See here for more details.

Topic names

By default, Decodable uses the stream name as the name of the topic it writes to. If a topic already exists with that name, Decodable will write to the existing topic. If it doesn’t exist, Decodable will create it.

You can change the name of the topic to which Decodable writes either in the web interface, or by using output-resource-name-template when calling decodable connection scan.

Writing data to multiple topics

A single instance of this connector can write to multiple topics.

If you are using the CLI to create or edit a connection with this connector, you must use the declarative approach. You can generate the connection definition for the resources that you want to write to using decodable connection scan.

Resource specifier keys

When using the decodable connection scan command of the Decodable CLI to create a connection specification, the following resource specifier keys are available:

Name Description

topic

The topic name

Serialization formats

The following formats are supported:

Format Description

json

JSON data.

avro

Plain Avro data using the schemas of the Streams attached to the connection.

debezium-json

A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option if you want to send Change Data Capture (CDC) data through this connector.

Security protocols

Decodable supports four different security protocols for accessing Kafka brokers.

The Kafka connector (and Apache Kafka) implements TLS which supersedes the SSL protocol. For historical reasons, Kafka calls this SSL rather than TLS even though the implementation is TLS. We do the same in order to avoid confusion for experienced Kafka users. That said, for the security-minded audience, it’s TLS! 😅
Security protocol Description

SASL-authenticated

Username and password authentication is used with SASL. Both SSL/TLS and PLAINTEXT encryption is supported, as well as SCRAM and PLAIN authentication mechanisms.

mTLS-authenticated

Two-way SSL authentication is used, so that Decodable and the Kafka brokers authenticate each other using the SSL protocol. Additionally, the connection is encrypted using SSL.

TLS-authenticated

One-way SSL authentication is used. The client (Decodable) holds the server’s (Kafka brokers) public certificate. Data from the Kafka brokers is encrypted using the server’s private key and Decodable can decrypt it using the public certificate. Data from Decodable is encrypted using the public certificate and can only be decrypted using the Kafka broker’s private key.

Unauthenticated

No authentication takes place between Decodable and the Kafka brokers. The connection isn’t encrypted.

See the following pages for specific instructions on how to create a connection to Apache Kafka using the various security protocols:

Connector starting state and offsets

A new sink connection will start reading from the Latest point in the source Decodable streams. This means that only data that’s written to the stream when the connection has started will be sent to the external system. You can override this when you start the connection to Earliest if you want to send all the existing data on the source streams to the target system, along with all new data that arrives on the streams.

When you restart a sink connection it will continue to read data from the point it most recently stored in the checkpoint before the connection stopped. You can also opt to discard the connection’s state and restart it afresh from Earliest or Latest as described above.

Learn more about starting state here.

Field Inclusion Policy

If your input streams contain either a primary key or a partition key, the connection will write those key fields to your topics as the message key. value.fields-include determines whether the primary or partition key fields should be included in the message value as well.

When value.fields-include = ALL, the primary or partition key fields will be written to both the message key and message value.

When value.fields-include = EXCEPT_KEY, the primary or partition key fields will only be included in the message key.

EXCEPT_KEY cannot be set when a primary or partition key does not exist in your input streams.

Example

For this example, assume an input Stream has three fields:

  • key (the primary or partition key field)

  • field1

  • field2

And, assume a record in the Stream is as follows:

Field Name | Value
-------------------------------
key        | key_value
field1     | field1_value
field2     | field2_value

When value.fields-include = ALL, the resulting message key and message value written to your topic will be:

Message Key

{
    "key": "key_value"
}

Message Value

{
    "key": "key_value",
    "field1": "field1_value",
    "field2": "field2_value"
}

When value.fields-include = EXCEPT_KEY, the resulting message key and message value written to your topic will be:

Message Key

{
    "key": "key_value"
}

Message Value

{
    "field1": "field1_value",
    "field2": "field2_value"
}