Apache Kafka source connector Features Connector name kafka Delivery guarantee Exactly once Supported task sizes S, M, L Multiplex capability A single instance of this connector can read from multiple topics. Compatibility Most Kafka 2.x and 3.x-compatible broker clusters including: Apache Kafka Amazon Managed Service for Apache Kafka (MSK) Confluent Platform For Confluent Cloud use the dedicated Confluent Cloud connector. Supported stream types Append stream Change stream Although the Kafka connector supports both types of stream, a single Kafka connection may only use one type of stream. If you want to send data from both change streams and append streams, then you must create separate Kafka connections. Configuration properties Property Description Required Default Connection bootstrap.servers A comma-separated list of your Kafka brokers. You must enter at least one broker. Enter each broker in the format: <host>:<port>. The broker list can’t contain any spaces. Yes security.protocol Specify the security protocol to use when connecting to a broker. Must be one of the following: PLAINTEXT TLS SASL_SSL SASL_PLAINTEXT See the Security protocols section for more information about these protocols. For the additional fields that you’ll need to fill out if you want to secure your connection with TLS or SASL Authentication. — PLAINTEXT Data key.format The format for the key, if present, in the Kafka message. Must be one of: avro json debezium-json See Serialization formats for more details. Only required when value.fields-include = EXCEPT_KEY. See the Field Inclusion Policy section for more details. value.format The format for the value payload in the Kafka message. Must be one of: avro json debezium-json See Serialization formats for more details. — value.fields-include Where data written to the Decodable payload should be read from. ALL: the message value only EXCEPT_KEY: the message key and value See the Field Inclusion Policy section for more details on setting this property. — ALL Advanced parse-error-policy Select the error handling policy. Must be one of the following: FAIL: When set to FAIL, Decodable stops the connection if any validation errors are encountered in the incoming data stream. IGNORE: When set to IGNORE, Decodable ignores all invalid records. All validated records are sent. With this policy, the connection isn’t stopped nor will any errors or warnings be shown if there is an invalid record. — FAIL scan.startup.mode Specifies where to start reading data when the connection is first started, or when it’s restarted with the state discarded. Must be one of the following: group-offsets: Start reading data from a specified group consumer. earliest-offset: Start reading data from the earliest available point in the stream. latest-offset: Start reading data from the latest available point in the stream. timestamp: Start reading data from a specific timestamp. — latest-offset properties.group.id The group ID used by the consumer. Only required if scan.startup.mode is set to group-offsets. scan.startup.timestamp-millis The timestamp from which the consumer will start scanning, in milliseconds since January 1, 1970 00:00:00.000 GMT. Only required if scan.startup.mode is set to timestamp. properties.auto.offset.reset What to do when there is no initial offset in Kafka (like a new consumer group) or if the specified offset no longer exists on the server (because of data deletion). — Security: SASL sasl.mechanism Specify the SASL mechanism as configured by the Kafka broker. Valid values are: PLAIN SCRAM-SHA-256 SCRAM-SHA-512 Yes sasl.username The username or API key for authentication. Yes sasl.password The secret associated with your provided API key. This must be provided as a secret resource. Yes Security: TLS tls.broker.certificate The public certificate for the broker used to encrypt traffic to Decodable. Yes tls.client.key The secret associated with the client TLS key used by mTLS connections. The key must be an unencrypted key in PKCS#8 format. This must be provided as a secret resource. Only required for mTLS tls.client.certificate The client certificate signed by the broker. You must use the Decodable CLI to specify the client certificate. See here for more details. Ingesting data from multiple topics A single instance of this connector can read from multiple topics. If you are using the CLI to create or edit a connection with this connector, you must use the declarative approach. You can generate the connection definition for the resources that you want to ingest using decodable connection scan. Resource specifier keys The following resource specifier keys are available: Name Description topic The topic name Serialization formats The following formats are supported: Format Description json JSON data. avro Plain Avro data using the schemas of the Streams attached to the connection. debezium-json A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option if you want to send Change Data Capture (CDC) data through this connector. Security protocols Decodable supports four different security protocols for accessing Kafka brokers. The Kafka connector (and Apache Kafka) implements TLS which supersedes the SSL protocol. For historical reasons, Kafka calls this SSL rather than TLS even though the implementation is TLS. We do the same in order to avoid confusion for experienced Kafka users. That said, for the security-minded audience, it’s TLS! 😅 Security protocol Description SASL-authenticated Username and password authentication is used with SASL. Both SSL/TLS and PLAINTEXT encryption is supported, as well as SCRAM and PLAIN authentication mechanisms. mTLS-authenticated Two-way SSL authentication is used, so that Decodable and the Kafka brokers authenticate each other using the SSL protocol. Additionally, the connection is encrypted using SSL. TLS-authenticated One-way SSL authentication is used. The client (Decodable) holds the server’s (Kafka brokers) public certificate. Data from the Kafka brokers is encrypted using the server’s private key and Decodable can decrypt it using the public certificate. Data from Decodable is encrypted using the public certificate and can only be decrypted using the Kafka broker’s private key. Unauthenticated No authentication takes place between Decodable and the Kafka brokers. The connection isn’t encrypted. See the following pages for specific instructions on how to create a connection to Apache Kafka using the various security protocols: How to set up SASL Authentication with Kafka. How to set up TLS Encryption with Kafka. Connector starting state and offsets When you create a connection, or restart it and discard state, it will read from the position in the topic as defined by scan startup mode. By default this is latest-offset and will therefore read from the end of the topic. Learn more about starting state here. Field Inclusion Policy Depending on the structure of your input messages, you may want to read the message key. The value.fields-include property determines whether an input message’s key is projected into the Decodable output payload; when applicable, the key.format property describes the format of the message key. When value.fields-include = ALL, the message key is ignored. All output fields are sourced from the message value, including primary or partition key fields when applicable. When value.fields-include = EXCEPT_KEY, the message key is deserialized and its fields are projected into the output record. The structure of a message key must match the primary or partition key fields defined on the receiving stream. The rest of the output record’s fields are sourced from the message value. key.format is required when value.fields-include = EXCEPT_KEY. EXCEPT_KEY cannot be set when a primary or partition key does not exist in your output streams. Example For this example, assume the following message key and message value pair is read from your topic: Message Key { "key": "from_message_key" } Message Value { "key": "from_message_value", "field1": "field1_value", "field2": "field2_value" } Assume the output Stream has three fields: key (the primary or partition key field) field1 field2 When value.fields-include = ALL, the resulting Decodable payload will be: Field Name | Value ------------------------------- key | from_message_value field1 | field1_value field2 | field2_value When value.fields-include = EXCEPT_KEY, the resulting Decodable payload will be: Field Name | Value ------------------------------- key | from_message_key field1 | field1_value field2 | field2_value