Apache Kafka

Use the Apache Kafka connector to get data from a Kafka topic into Decodable. If you are looking for information about how to create a connection to get data into Kafka from Decodable, see Apache Kafka in the Connect to a data destination chapter.

Overview

Typesource, sink
Connector namekafka
Delivery guaranteeat least once
CompatibilityMost Kafka 2.x and 3.x-compatible broker clusters including:

Create a connection to Apache Kafka

Follow these steps to get data from Apache Kafka. These steps assume that you are using the Decodable web interface. However, if you want to use the Decodable CLI to create the connection, you can refer to the Property Name column for information about what the underlying property names are.

  1. From the Connections page, select the Apache Kafka Streaming Connector and complete the following fields.
UI FieldProperty Name in the Decodable CLIDescription
Connection TypeN/ASelect source to use this connector to get data into Decodable.
Bootstrap Serversbootstrap.serversA comma-separated list of your Kafka brokers. You must enter at least one broker. Enter each broker in the format: <host>:<port>. The broker list cannot contain any spaces.
TopictopicA semicolon-separated list of one or more topics to get data from.
Broker Security Protocolsecurity.protocolSpecify the security protocol to use when connecting to a broker. Must be one of the following:

-PLAINTEXT
-TLS
-SASL_SSL
-SASL_PLAINTEXT

See the Security section for more information about these protocols and for the additional fields that you'll need to fill out if you want to secure your connection with TLS or SASL Authentication.
Value Formatvalue.formatThe format for data in the Kafka topic. Must be one of the following:

- JSON: Read and write JSON data based on a JSON schema.
- RAW: Read and write raw (byte based) values as a single column.
- AVRO: Read and write Avro data based on an Avro schema. You can also use Avro with a schema registry. See the Avro section for more information.
- Debezium (JSON): A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option if you want to send CDC data through this connector.
- PROTOBUF: Read and write Protobuf-formatted data based on Protobuf schema. See the Data Type Mapping for Protobuf data for how Decodable types map to Protobuf types.
Parse Error Policyparse-error-policySelect the error handling policy. Must be one of the following:

- FAIL: When set to FAIL, Decodable stops the connection if any validation errors are encountered in the incoming data stream.
- IGNORE: When set to IGNORE, Decodable ignores all invalid records. All validated records are sent. With this policy, the connection is not stopped nor will any errors or warnings be shown if there is an invalid record.
Consumer Group IDproperties.group.idThe group ID used by the consumer.

This argument is only required when group-offsets has been set to scan startup mode.
Scan Startup Modescan.startup.modeSpecifies where in the topic to start reading data when the connection is first started or when it's restarted.

  • group-offsets: Start reading data from a specified group consumer.

  • earliest-offset: Start reading data from the earliest available point in the stream.

  • latest-offset: Start reading data from the latest available point in the stream.

  • timestamp: Start reading data from a specific timestamp.

  • specific-offset: Start reading data from a specific offset value.

Consumer Offset Reset Strategyproperties.auto.offset.resetWhat to do when there is no initial offset in Kafka (e.g. a new consumer group) or if the specified offset does not exist any more on the server (e.g. because that data has been deleted).
  1. Select the stream that you’d like to connect to this connector. This will be the stream that receives events from Apache Kafka. Then, select Next.
  2. Define the connection’s schema. Select New Schema to manually enter the fields and field types present or Import Schema if you want to paste the schema in the form of an Avro or JSON array.
    1. The stream's schema must match the schema of the data that you plan on sending through this connection.
    2. For more information about creating a stream or defining the stream schema, see Create a Stream.
  3. Select Next when you are finished providing defining the connection’s schema.
  4. Give the newly created connection a Name and Description and select Save.

Reference

Data Type Mapping for Protobuf data

If you are using the Apache Kafka connector to read Protobuf data, refer to the following table for how Decodable types map to Protobuf types.

Decodable TypeProtobuf TypeNotes
CHAR / VARCHAR / STRINGstring
BOOLEANbool
BINARY / VARBINARYbytes
INTint32
BIGINTint64
FLOATfloat
DOUBLEdouble
ARRAYrepeatedElements cannot be null, the string default value can be specified by write-null-string-literal.
MAPmapKeys or values cannot be null, the string default value can be specified by write-null-string-literal.
ROWmessage
VARCHAR / CHAR / TINYINT / SMALLINT / INTEGER / BIGINTenumThe enum value of protobuf can be mapped to string or number of flink row accordingly.

Extract message metadata

When consuming records from Apache Kafka in Decodable, you can access specific metadata fields associated with each Kafka record. A metadata field is identified by a string-based key and an associated data type. For example, the Apache Kafka connector has a metadata field with the key timestamp and data type TIMESTAMP_LTZ(3). See Available metadata for a list of metadata fields that you have access to.

A metadata field has its type formed as:

{datatype} METADATA [FROM '{key}']

Thus, a metadata field is indicated by the METADATA keyword, with an optional FROM to provide the key. If the key is not provided explicitly with FROM, it defaults to the name of the field.

For example:

TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'

Available metadata

When you connect to Apache Kafka as a data source, you have access to the following metadata fields.

KeyData TypeDescription
topicSTRING NOT NULLTopic name of the Kafka record.
partitionINT NOT NULLPartition ID of the Kafka record.
headersMAP<string, bytes> NOT NULLHeaders of the Kafka record as a map of raw bytes.
leader-epochINT NULLLeader epoch of the Kafka record if available.
offsetBIGINT NOT NULLOffset of the Kafka record in the partition.
timestampTIMESTAMP_LTZ(3) NOT NULLTimestamp of the Kafka record.
timestamp-typeSTRING NOT NULLTimestamp type of the Kafka record. One of:
  • NoTimestampType
  • CreateTime (also set when writing metadata)
  • LogAppendTime

Supported security protocols

Decodable supports four different security protocols for accessing Kafka brokers.

SSL really means TLS
The Kafka connector (and Apache Kafka) actually implements TLS which supersedes the SSL protocol. For historical reasons, Kafka calls this SSL rather than TLS even though the implementation is really TLS. We do the same in order to avoid confusion for experienced Kafka users. That said, for the security-minded audience, it's really TLS!

Security protocolDescription
SASL-authenticatedUsername and password authentication is used with SASL. Both SSL/TLS and PLAINTEXT encryption is supported, as well as SCRAM and PLAIN authentication mechanisms.
mTLS-authenticatedTwo-way SSL authentication is used, so that Decodable and the Kafka broker(s) authenticate each other using the SSL protocol. Additionally, the connection is encrypted using SSL.
TLS-authenticatedOne-way SSL authentication is used. The client (Decodable) holds the server's (Kafka brokers) public certificate. Data from the Kafka broker(s) is encrypted using the server's private key and Decodable can decrypt it using the public certificate. Data from Decodable is encrypted using the public certificate and can only be decrypted using the Kafka broker's private key.
UnauthenticatedNo authentication takes place between Decodable and the Kafka broker(s). The connection is not encrypted.

See the following pages for specific instructions on how to create a connection to Apache Kafka using the various security protocols:

SASL Properties

When configuring a Kafka connection to use SASL-authentication, the following connection properties are used.

UI FieldProperty Name in the Decodable CLIDescription
SASL Mechanismsasl.mechanismSpecify the SASL mechanism as configured by the Kafka broker. The following mechanisms are supported:
- PLAIN
- SCRAM-SHA-256
- SCRAM-SHA-512
SASL Usernamesasl.usernameThe username or API key for authentication.
SASL Passwordsasl.passwordThe secret associated with your provided API key. If you are using the Decodable CLI, this is the ID of a secret resource in your account. Run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret.

Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Manage Roles, Groups, and Permissions for more information.

TLS Properties

When configuring a Kafka connection to use TLS or mTLS-authentication, the following connection properties are used.

UI FieldProperty Name in the Decodable CLIDescription
Broker Certificatetls.broker.certificateThe public certificate for the broker used to encrypt traffic to Decodable.
TLS Client Certificate Typetls.client.keyThe secret associated with the client TLS key used by mTLS connections. The key must be an unencrypted key in PKCS#8 format.

If you are using the Decodable CLI, this is the ID of the secret resource corresponding to the client TLS key in your account. Run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret.

Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Manage Roles, Groups, and Permissions for more information.
N/Atls.broker.signed_client_certificateSpecifies the client certificate signed by the broker.

This property is required when the tls.client.certificate.type is set to CSR. You must use the Decodable CLI to specify the client certificate. See Option 2: Using Certificate Sign Request in the How To: Set up Mutual TLS Authentication with Kafka section for more information.