Apache Kafka sink connector

Features

Connector name

kafka

Delivery guarantee

At least once

Supported task sizes

S, M, L

Multiplex capability

A single instance of this connector can write to a single topic.

Compatibility

Most Kafka 2.x and 3.x-compatible broker clusters including:

Supported stream types

Append stream

Configuration properties

Property Description Required Default

Connection

bootstrap.servers

A comma-separated list of your Kafka brokers. You must enter at least one broker.

Enter each broker in the format: <host>:<port>.

The broker list can’t contain any spaces.

Yes

security.protocol

Specify the security protocol to use when connecting to a broker.

Must be one of the following:

  • PLAINTEXT

  • TLS

  • SASL_SSL

  • SASL_PLAINTEXT

See the Security protocols section for more information about these protocols. For the additional fields that you’ll need to fill out if you want to secure your connection with TLS or SASL Authentication.

PLAINTEXT

Data

topic

The name of the topic to write data to.

Yes

key.format

The format for the key, if present, in the Kafka message.

Must be one of:

See Serialization formats for more details.

key.fields

A list of the key’s field names, delimited by semicolons, that comprise the message key.

For nested fields specify the root key only, and the nested fields as part of the connection’s schema.

value.format

The format for the value payload in the Kafka message.

Must be one of:

  • avro

  • avro-confluent (see below for further properties)

  • json

  • debezium-json

  • debezium-avro-confluent

  • raw

See Serialization formats for more details.

Format: Avro

avro-confluent.schema-registry.url

The URL of the schema registry to use.

Yes

avro-confluent.basic-auth.credentials-source

The source of the authentication credentials to use, if required.

Must be one of:

  • USER_INFO

  • SASL_INHERIT.

Typically USER_INFO is used most often when schema registry authentication is enabled.

avro-confluent.basic-auth.user-info

The authentication credentials to use, if credentials are to be used.

This must be provided as a secret resource. The contents should be in the form username:password.

avro-confluent.long-schema-id

Whether to parse the schema ID a LONG.

Set this to true for reading and writing Apicurio-style payloads rather than the Confluent default.

If set to false then the schema id is read as an INTEGER.

false

Security: SASL

sasl.mechanism

Specify the SASL mechanism as configured by the Kafka broker. Valid values are:

  • PLAIN

  • SCRAM-SHA-256

  • SCRAM-SHA-512

Yes

sasl.username

The username or API key for authentication.

Yes

sasl.password

The secret associated with your provided API key.

This must be provided as a secret resource.

Yes

Security: TLS

tls.broker.certificate

The public certificate for the broker used to encrypt traffic to Decodable.

Yes

tls.client.key

The secret associated with the client TLS key used by mTLS connections. The key must be an unencrypted key in PKCS#8 format.

This must be provided as a secret resource.

Only required for mTLS

tls.client.certificate

The client certificate signed by the broker.

You must use the Decodable CLI to specify the client certificate. See here for more details.

Serialization formats

The following formats are supported:

Format Description

json

JSON data

raw

Raw (byte based) values as a single column.

avro

Plain Avro data using the schema defined when you create the connection

avro-confluent

Avro data using a predefined schema store in a schema registry.

debezium-json

A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option or debezium-avro-confluent if you want to send Change Data Capture (CDC) data through this connector.

debezium-avro-confluent

A unified format schema for changelogs with additional support for serializing messages using Avro with the schema held in a schema registry. Select this option or debezium-json if you want to send Change Data Capture (CDC) data through this connector.

Avro

A popular choice for data serialization on Kafka is Apache Avro. Decodable supports both plain Avro, as well as Avro with a schema registry.

In order to use plain Avro (without the schema held in a Schema Registry) the Avro schema is generated from the connection schema, and is available on the inferred_schema connection property. For example:

$ decodable connection get 69caa7e1

avro_test_sink
 id 69caa7e1
 description A Kafka sink connection with plain Avro schema
 connector kafka
 type sink
 stream id 865e8555
 schema
 0 field1 INT NOT NULL
 1 field2 DOUBLE NOT NULL
 2 field3 STRING NOT NULL
 properties
 bootstrap.servers broker.svc.cluster.local:9092
 format avro
 inferred_schema {"type":"record","name":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"double"},{"name":"field3","type":"string"}]}
 topic avro_topic
[…]

Avro with Schema Registry

In this mode, the Avro schema is still derived from the connection schema, but it’s validated and registered against the configured schema registry.

To use Avro with a Schema Registry such as Confluent’s, additional connection properties are required. These can be specified when you create the connection, if using the CLI or API. If you create the connection through the UI you will need to use the CLI/API to update the connection to add the necessary properties.

For example, to create a sink Kafka connection reading Avro messages using a secured schema registry using the CLI:

decodable connection create                                                           \
  --name avro_registry_sink                                                         \
  --description "A Kafka sink connection with SASL auth and a schema registry"      \
  --type sink                                                                       \
  --stream-id 295e2a7f                                                                \
  --connector kafka                                                                   \
  --prop bootstrap.servers="some.broker.cloud:9092"                                   \
  --prop topic="my-topic"                                                             \
  --prop value.format=avro-confluent                                                  \
  --prop avro-confluent.schema-registry.url="https://my.schema.registry.cloud"        \
  --prop avro-confluent.basic-auth.credentials-source=USER_INFO                       \
  --prop avro-confluent.basic-auth.user-info="my-registry-user:my-registry-password"

Security protocols

Decodable supports four different security protocols for accessing Kafka brokers.

The Kafka connector (and Apache Kafka) implements TLS which supersedes the SSL protocol. For historical reasons, Kafka calls this SSL rather than TLS even though the implementation is TLS. We do the same in order to avoid confusion for experienced Kafka users. That said, for the security-minded audience, it’s TLS! 😅
Security protocol Description

SASL-authenticated

Username and password authentication is used with SASL. Both SSL/TLS and PLAINTEXT encryption is supported, as well as SCRAM and PLAIN authentication mechanisms.

mTLS-authenticated

Two-way SSL authentication is used, so that Decodable and the Kafka brokers authenticate each other using the SSL protocol. Additionally, the connection is encrypted using SSL.

TLS-authenticated

One-way SSL authentication is used. The client (Decodable) holds the server’s (Kafka brokers) public certificate. Data from the Kafka brokers is encrypted using the server’s private key and Decodable can decrypt it using the public certificate. Data from Decodable is encrypted using the public certificate and can only be decrypted using the Kafka broker’s private key.

Unauthenticated

No authentication takes place between Decodable and the Kafka brokers. The connection isn’t encrypted.

See the following pages for specific instructions on how to create a connection to Apache Kafka using the various security protocols:

Connector starting state and offsets

A new sink connection will start reading from the Latest point in the source Decodable stream. This means that only data that’s written to the stream when the connection has started will be sent to the external system. You can override this when you start the connection to Earliest if you want to send all the existing data on the source stream to the target system, along with all new data that arrives on the stream.

When you restart a sink connection it will continue to read data from the point it most recently stored in the checkpoint before the connection stopped. You can also opt to discard the connection’s state and restart it afresh from Earliest or Latest as described above.

Learn more about starting state here.