Apache Kafka source connector

Features

Connector name

kafka

Exactly once

Supported task sizes

S, M, L

A single instance of this connector can read from multiple topics.

Compatibility

Most Kafka 2.x and 3.x-compatible broker clusters including:

Supported stream types

Although the Kafka connector supports both types of stream, a single Kafka connection may only use one type of stream. If you want to send data from both change streams and append streams, then you must create separate Kafka connections.

Configuration properties

Property Description Required Default

Connection

bootstrap.servers

A comma-separated list of your Kafka brokers. You must enter at least one broker.

Enter each broker in the format: <host>:<port>.

The broker list can’t contain any spaces.

Yes

security.protocol

Specify the security protocol to use when connecting to a broker.

Must be one of the following:

  • PLAINTEXT

  • TLS

  • SASL_SSL

  • SASL_PLAINTEXT

See the Security protocols section for more information about these protocols. For the additional fields that you’ll need to fill out if you want to secure your connection with TLS or SASL Authentication.

PLAINTEXT

Data

key.format

The format for the key, if present, in the Kafka message.

Must be one of:

  • avro

  • json

  • debezium-json

See Serialization formats for more details.

Only required when value.fields-include = EXCEPT_KEY. See the Field Inclusion Policy section for more details.

value.format

The format for the value payload in the Kafka message.

Must be one of:

  • avro

  • json

  • debezium-json

See Serialization formats for more details.

value.fields-include

Where data written to the Decodable payload should be read from.

  • ALL: the message value only

  • EXCEPT_KEY: the message key and value

See the Field Inclusion Policy section for more details on setting this property.

ALL

Advanced

parse-error-policy

Select the error handling policy. Must be one of the following:

  • FAIL: When set to FAIL, Decodable stops the connection if any validation errors are encountered in the incoming data stream.

  • IGNORE: When set to IGNORE, Decodable ignores all invalid records. All validated records are sent. With this policy, the connection isn’t stopped nor will any errors or warnings be shown if there is an invalid record.

FAIL

scan.startup.mode

Specifies where to start reading data when the connection is first started, or when it’s restarted with the state discarded.

Must be one of the following:

  • group-offsets: Start reading data from a specified group consumer.

  • earliest-offset: Start reading data from the earliest available point in the stream.

  • latest-offset: Start reading data from the latest available point in the stream.

  • timestamp: Start reading data from a specific timestamp.

latest-offset

properties.group.id

The group ID used by the consumer.

Only required if scan.startup.mode is set to group-offsets.

scan.startup.timestamp-millis

The timestamp from which the consumer will start scanning, in milliseconds since January 1, 1970 00:00:00.000 GMT.

Only required if scan.startup.mode is set to timestamp.

properties.auto.offset.reset

What to do when there is no initial offset in Kafka (like a new consumer group) or if the specified offset no longer exists on the server (because of data deletion).

Security: SASL

sasl.mechanism

Specify the SASL mechanism as configured by the Kafka broker. Valid values are:

  • PLAIN

  • SCRAM-SHA-256

  • SCRAM-SHA-512

Yes

sasl.username

The username or API key for authentication.

Yes

sasl.password

The secret associated with your provided API key.

This must be provided as a secret resource.

Yes

Security: TLS

tls.broker.certificate

The public certificate for the broker used to encrypt traffic to Decodable.

Yes

tls.client.key

The secret associated with the client TLS key used by mTLS connections. The key must be an unencrypted key in PKCS#8 format.

This must be provided as a secret resource.

Only required for mTLS

tls.client.certificate

The client certificate signed by the broker.

You must use the Decodable CLI to specify the client certificate. See here for more details.

Ingesting data from multiple topics

A single instance of this connector can read from multiple topics.

If you are using the CLI to create or edit a connection with this connector, you must use the declarative approach. You can generate the connection definition for the resources that you want to ingest using decodable connection scan.

Resource specifier keys

The following resource specifier keys are available:

Name Description

topic

The topic name

Serialization formats

The following formats are supported:

Format Description

json

JSON data.

avro

Plain Avro data using the schemas of the Streams attached to the connection.

debezium-json

A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option if you want to send Change Data Capture (CDC) data through this connector.

Security protocols

Decodable supports four different security protocols for accessing Kafka brokers.

The Kafka connector (and Apache Kafka) implements TLS which supersedes the SSL protocol. For historical reasons, Kafka calls this SSL rather than TLS even though the implementation is TLS. We do the same in order to avoid confusion for experienced Kafka users. That said, for the security-minded audience, it’s TLS! 😅
Security protocol Description

SASL-authenticated

Username and password authentication is used with SASL. Both SSL/TLS and PLAINTEXT encryption is supported, as well as SCRAM and PLAIN authentication mechanisms.

mTLS-authenticated

Two-way SSL authentication is used, so that Decodable and the Kafka brokers authenticate each other using the SSL protocol. Additionally, the connection is encrypted using SSL.

TLS-authenticated

One-way SSL authentication is used. The client (Decodable) holds the server’s (Kafka brokers) public certificate. Data from the Kafka brokers is encrypted using the server’s private key and Decodable can decrypt it using the public certificate. Data from Decodable is encrypted using the public certificate and can only be decrypted using the Kafka broker’s private key.

Unauthenticated

No authentication takes place between Decodable and the Kafka brokers. The connection isn’t encrypted.

See the following pages for specific instructions on how to create a connection to Apache Kafka using the various security protocols:

Connector starting state and offsets

When you create a connection, or restart it and discard state, it will read from the position in the topic as defined by scan startup mode. By default this is latest-offset and will therefore read from the end of the topic.

Learn more about starting state here.

Field Inclusion Policy

Depending on the structure of your input messages, you may want to read the message key. The value.fields-include property determines whether an input message’s key is projected into the Decodable output payload; when applicable, the key.format property describes the format of the message key.

When value.fields-include = ALL, the message key is ignored. All output fields are sourced from the message value, including primary or partition key fields when applicable.

When value.fields-include = EXCEPT_KEY, the message key is deserialized and its fields are projected into the output record. The structure of a message key must match the primary or partition key fields defined on the receiving stream. The rest of the output record’s fields are sourced from the message value.

key.format is required when value.fields-include = EXCEPT_KEY.
EXCEPT_KEY cannot be set when a primary or partition key does not exist in your output streams.

Example

For this example, assume the following message key and message value pair is read from your topic:

Message Key

{
    "key": "from_message_key"
}

Message Value

{
    "key": "from_message_value",
    "field1": "field1_value",
    "field2": "field2_value"
}

Assume the output Stream has three fields:

  • key (the primary or partition key field)

  • field1

  • field2

When value.fields-include = ALL, the resulting Decodable payload will be:

Field Name | Value
-------------------------------
key        | from_message_value
field1     | field1_value
field2     | field2_value

When value.fields-include = EXCEPT_KEY, the resulting Decodable payload will be:

Field Name | Value
-------------------------------
key        | from_message_key
field1     | field1_value
field2     | field2_value