Apache Kafka sink connector Features Connector name kafka Delivery guarantee At least once Supported task sizes S, M, L Multiplex capability A single instance of this connector can write to multiple topics. Compatibility Most Kafka 2.x and 3.x-compatible broker clusters including: Apache Kafka Amazon Managed Service for Apache Kafka (MSK) Confluent Platform For Confluent Cloud use the dedicated Confluent Cloud connector. Supported stream types Append stream Change stream Although the Kafka connector supports both types of stream, a single Kafka connection may only use one type of stream. If you want to send data from both change streams and append streams, then you must create separate Kafka connections. Configuration properties Property Description Required Default Connection bootstrap.servers A comma-separated list of your Kafka brokers. You must enter at least one broker. Enter each broker in the format: <host>:<port>. The broker list can’t contain any spaces. Yes security.protocol Specify the security protocol to use when connecting to a broker. Must be one of the following: PLAINTEXT TLS SASL_SSL SASL_PLAINTEXT See the Security protocols section for more information about these protocols. For the additional fields that you’ll need to fill out if you want to secure your connection with TLS or SASL Authentication. — PLAINTEXT Data key.format The format for the key, if present, in the Kafka message. Must be one of: avro json debezium-json See Serialization formats for more details. Only required when the input streams contain either a primary key or a partition key. value.format The format for the value payload in the Kafka message. Must be one of: avro json debezium-json See Serialization formats for more details. — value.fields-include Where primary/partition key fields should appear in the output messages. ALL: the message key and value EXCEPT_KEY: the message key only See the Field Inclusion Policy section for more details on setting this property. — ALL Security: SASL sasl.mechanism Specify the SASL mechanism as configured by the Kafka broker. Valid values are: PLAIN SCRAM-SHA-256 SCRAM-SHA-512 Yes sasl.username The username or API key for authentication. Yes sasl.password The secret associated with your provided API key. This must be provided as a secret resource. Yes Security: TLS tls.broker.certificate The public certificate for the broker used to encrypt traffic to Decodable. Yes tls.client.key The secret associated with the client TLS key used by mTLS connections. The key must be an unencrypted key in PKCS#8 format. This must be provided as a secret resource. Only required for mTLS tls.client.certificate The client certificate signed by the broker. You must use the Decodable CLI to specify the client certificate. See here for more details. Topic names By default, Decodable uses the stream name as the name of the topic it writes to. If a topic already exists with that name, Decodable will write to the existing topic. If it doesn’t exist, Decodable will create it. You can change the name of the topic to which Decodable writes either in the web interface, or by using output-resource-name-template when calling decodable connection scan. Writing data to multiple topics A single instance of this connector can write to multiple topics. If you are using the CLI to create or edit a connection with this connector, you must use the declarative approach. You can generate the connection definition for the resources that you want to write to using decodable connection scan. Resource specifier keys When using the decodable connection scan command of the Decodable CLI to create a connection specification, the following resource specifier keys are available: Name Description topic The topic name Serialization formats The following formats are supported: Format Description json JSON data. avro Plain Avro data using the schemas of the Streams attached to the connection. debezium-json A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option if you want to send Change Data Capture (CDC) data through this connector. Security protocols Decodable supports four different security protocols for accessing Kafka brokers. The Kafka connector (and Apache Kafka) implements TLS which supersedes the SSL protocol. For historical reasons, Kafka calls this SSL rather than TLS even though the implementation is TLS. We do the same in order to avoid confusion for experienced Kafka users. That said, for the security-minded audience, it’s TLS! 😅 Security protocol Description SASL-authenticated Username and password authentication is used with SASL. Both SSL/TLS and PLAINTEXT encryption is supported, as well as SCRAM and PLAIN authentication mechanisms. mTLS-authenticated Two-way SSL authentication is used, so that Decodable and the Kafka brokers authenticate each other using the SSL protocol. Additionally, the connection is encrypted using SSL. TLS-authenticated One-way SSL authentication is used. The client (Decodable) holds the server’s (Kafka brokers) public certificate. Data from the Kafka brokers is encrypted using the server’s private key and Decodable can decrypt it using the public certificate. Data from Decodable is encrypted using the public certificate and can only be decrypted using the Kafka broker’s private key. Unauthenticated No authentication takes place between Decodable and the Kafka brokers. The connection isn’t encrypted. See the following pages for specific instructions on how to create a connection to Apache Kafka using the various security protocols: How to set up SASL Authentication with Kafka. How to set up TLS Encryption with Kafka. Connector starting state and offsets A new sink connection will start reading from the Latest point in the source Decodable streams. This means that only data that’s written to the stream when the connection has started will be sent to the external system. You can override this when you start the connection to Earliest if you want to send all the existing data on the source streams to the target system, along with all new data that arrives on the streams. When you restart a sink connection it will continue to read data from the point it most recently stored in the checkpoint before the connection stopped. You can also opt to discard the connection’s state and restart it afresh from Earliest or Latest as described above. Learn more about starting state here. Field Inclusion Policy If your input streams contain either a primary key or a partition key, the connection will write those key fields to your topics as the message key. value.fields-include determines whether the primary or partition key fields should be included in the message value as well. When value.fields-include = ALL, the primary or partition key fields will be written to both the message key and message value. When value.fields-include = EXCEPT_KEY, the primary or partition key fields will only be included in the message key. EXCEPT_KEY cannot be set when a primary or partition key does not exist in your input streams. Example For this example, assume an input Stream has three fields: key (the primary or partition key field) field1 field2 And, assume a record in the Stream is as follows: Field Name | Value ------------------------------- key | key_value field1 | field1_value field2 | field2_value When value.fields-include = ALL, the resulting message key and message value written to your topic will be: Message Key { "key": "key_value" } Message Value { "key": "key_value", "field1": "field1_value", "field2": "field2_value" } When value.fields-include = EXCEPT_KEY, the resulting message key and message value written to your topic will be: Message Key { "key": "key_value" } Message Value { "field1": "field1_value", "field2": "field2_value" }