Apache Kafka
Use the Apache Kafka Connector to send data to Apache Kafka. If you are looking for information about how to create a connection to get data into Decodable from Kafka, see Apache Kafka in the Connect to a data source chapter.
Overview
Connector name | kafka |
---|---|
Type | source , sink |
Delivery guarantee | at least once |
Compatibility | Most Kafka 2.x and 3.x-compatible broker clusters including: |
Additional Kafka Support
If your version of Kafka is not currently supported, please contact [email protected] or join our Slack community and let us know!
Create a connection to Apache Kafka with the Apache Kafka Connector
Follow these steps to get data from or send data to Apache Kafka. These steps assume that you are using the Decodable web interface. However, if you want to use the Decodable CLI to create the connection, you can refer to the Property Name column for information about what the underlying property names are.
- From the Connections page, select the Apache Kafka Streaming Connector and complete the following fields.
UI Field | Property Name in the Decodable CLI | Description |
---|---|---|
Connection Type | N/A | Select sink for Decodable to send data to the Kafka topic provided. |
Bootstrap Servers | bootstrap.servers | A comma-separated list of your Kafka brokers. You must enter at least one broker. Enter each broker in the format: <host>:<port> . The broker list cannot contain any spaces. |
Topic | topic | A semicolon-separated list of one or more topics to either read data from or write data to. |
Broker Security Protocol | security.protocol | Specify the security protocol to use when connecting to a broker. Must be one of the following: - PLAINTEXT - TLS - SASL_SSL - SASL_PLAINTEXT See the Security section for more information about these protocols and for the additional fields that you'll need to fill out if you want to secure your connection with TLS or SASL Authentication. |
Value Format | value.format | The format for data in the Kafka topic. Must be one of the following: - JSON : Read and write JSON data based on a JSON schema.- RAW : Read and write raw (byte based) values as a single column.- AVRO : Read and write Avro data based on an Avro schema. You can also use Avro with a schema registry. See the Avro section for more information.- Debezium (JSON) : A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option if you want to send CDC data through this connector.- PROTOBUF : Read and write Protobuf-formatted data based on Protobuf schema. See the Data Type Mapping for Protobuf data for how Decodable types map to Protobuf types. |
- Select which stream contains the records that you’d like to send to Apache Kafka. Then, select Next.
- Give the newly created connection a Name and Description and select Save.
Reference
Security
Decodable supports four different security protocols for accessing Kafka brokers.
SSL really means TLS
The Kafka connector (and Apache Kafka) actually implements TLS which supersedes the SSL protocol. For historical reasons, Kafka calls this SSL rather than TLS even though the implementation is really TLS. We do the same in order to avoid confusion for experienced Kafka users. That said, for the security-minded audience, it's really TLS!
Security protocol | Description |
---|---|
SASL-authenticated | Username and password authentication is used with SASL. Both SSL/TLS and PLAINTEXT encryption is supported, as well as SCRAM and PLAIN authentication mechanisms. |
mTLS-authenticated | Two-way SSL authentication is used, so that Decodable and the Kafka broker(s) authenticate each other using the SSL protocol. Additionally, the connection is encrypted using SSL. |
TLS-authenticated | One-way SSL authentication is used. The client (Decodable) holds the server's (Kafka brokers) public certificate. Data from the Kafka broker(s) is encrypted using the server's private key and Decodable can decrypt it using the public certificate. Data from Decodable is encrypted using the public certificate and can only be decrypted using the Kafka broker's private key. |
Unauthenticated | No authentication takes place between Decodable and the Kafka broker(s). The connection is not encrypted. |
See the following pages for specific instructions on how to create a connection to Apache Kafka using the various security protocols:
- How to set up SASL Authentication with Kafka
- How to set up Mutual TLS Authentication with Kafka
- How to set up TLS Encryption with Kafka
SASL Properties
When configuring a Kafka connection to use SASL Authentication, the following connection properties are used:
Property | Disposition | Description |
---|---|---|
sasl.mechanism | required | Specify the SASL mechanism as configured by the Kafka broker. Support values are ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"]. |
sasl.username | required | The username or API key for authentication. |
sasl.password | required | The secret associated with your provided API key. If you are using the Decodable CLI, this is the ID of a secret resource in your account. Run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret.Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Manage Roles, Groups, and Permissions for more information. |
TLS Properties
When configuring a Kafka connection to use TLS or mTLS, the following connection properties are used:
Property | Disposition | Description |
---|---|---|
tls.broker.certificate | required | Broker's public certificate used to encrypt traffic from Decodable to external Kafka brokers. The property is only used when security.protocol=TLS. The certificate must be an X509 certificate in PEM format. |
tls.client.key | optional | The secret associated with the client TLS key used by mTLS connections. The key must be an unencrypted key in PKCS#8 format. If you are using the Decodable CLI, this is the ID of the secret resource corresponding to the client TLS key in your account. Run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret.Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Manage Roles, Groups, and Permissions for more information. |
tls.client.certificate | optional | Specifies the client certificate trusted by the brokers for mTLS connections. The certificate must be an X509 certificate in PEM format. |
Avro
A popular choice for data serialization on Kafka is Apache Avro . Decodable supports both plain Avro, as well as Avro with a schema registry.
Plain Avro
In order to use plain Avro, the connection property format=avro
is required. In this mode, the Avro schema is generated from the connection schema, and is available on the inferred_schema
connection property. For example:
decodable connection get 69caa7e1
avro_test_source
id 69caa7e1
description A Kafka source connection with plain Avro schema
connector kafka
type source
stream id 865e8555
schema
0 field1 INT NOT NULL
1 field2 DOUBLE NOT NULL
2 field3 STRING NOT NULL
properties
bootstrap.servers kafka-kafka-brokers.kafka.svc.cluster.local:9092
format avro
inferred_schema {"type":"record","name":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"double"},{"name":"field3","type":"string"}]}
topic avro_topic
target state RUNNING
actual state RUNNING
create time 2022-02-04T19:05:19Z
update time 2022-02-08T17:04:46Z
Avro with Schema Registry
To use Avro with a schema registry, the connection property format=confluent-avro
is required. In this mode, the Avro schema is still derived from the connection schema, but it is validated and registered against the configured schema registry.
When using this mode, the following properties are used:
Property | Required? | Description |
---|---|---|
avro-confluent.schema-registry.url | required | The URL of the schema registry to use. |
avro-confluent.basic-auth.credentials-source | optional | The source of the authentication credentials to use, if required. Allowed values are ["USER_INFO", "SASL_INHERIT"]. Typically USER_INFO is used most often when schema registry authentication is enabled. |
avro-confluent.basic-auth.user-info | optional | The authentication credentials to use, if credentials are to be used. They must be specified in the form of username:password . Note that the credentials are kept secret, once set, and will not be visible in the connection properties output. |
avro-confluent.long-schema-id | optional | Whether to parse the schema ID as an integer or a long. If set to true , this will allow for reading and writing Apicurio-style payloads rather than the Confluent default. Defaults to false |
For example, you can create a source Kafka connection using both SASL Authentication, as well as a secured schema registry using:
decodable connection create \
--name avro_registry_source \
--description "A Kafka source connection with SASL auth and a schema registry" \
--type source \
--stream-id 295e2a7f \
--connector kafka \
--prop format="avro-confluent" \
--prop topic="my-topic" \
--prop bootstrap.servers="some.broker.cloud:9092" \
--prop security.protocol=SASL_SSL \
--prop sasl.username="my-api-key" \
--prop sasl.password="my-secret-key" \
--prop sasl.mechanism="PLAIN" \
--prop avro-confluent.schema-registry.url="https://my.schema.registry.cloud" \
--prop avro-confluent.basic-auth.user-info="my-registry-user:my-registry-password" \
--prop avro-confluent.basic-auth.credentials-source="USER_INFO "
Data Type Mapping for Protobuf data
If you are using the Apache Kafka connector to read or write Protobuf data, refer to the following table for how Decodable types map to Protobuf types.
Decodable Type | Protobuf Type | Notes |
---|---|---|
CHAR / VARCHAR / STRING | string | |
BOOLEAN | bool | |
BINARY / VARBINARY | bytes | |
INT | int32 | |
BIGINT | int64 | |
FLOAT | float | |
DOUBLE | double | |
ARRAY | repeated | Elements cannot be null, the string default value can be specified by write-null-string-literal . |
MAP | map | Keys or values cannot be null, the string default value can be specified by write-null-string-literal . |
ROW | message | |
VARCHAR / CHAR / TINYINT / SMALLINT / INTEGER / BIGINT | enum | The enum value of protobuf can be mapped to string or number of flink row accordingly. |
Updated about 2 months ago