Apache Kafka
Decodable supports both sourcing data from, and sinking data to, Apache Kafka, which is an open-source distributed event streaming platform optimized for ingesting and processing streaming data in real-time. Kafka provides three main functions to its users:
- Publish and subscribe to streams of records
- Effectively store streams of records in the order in which records were generated
- Process streams of records in real time
Kafka is primarily used to build real-time streaming data pipelines and applications that adapt to the data streams. It combines messaging, storage, and stream processing to allow storage and analysis of both historical and real-time data.
Getting Started
Connections come in two flavors: source and sink. Source connections read from an external system and write to a Decodable stream, while sink connections read from a stream and write to an external system. Kafka connectors can be used in either role, performing as either a source
or a sink
. The configuration is almost identical, except that source connectors allow for defining a policy for handling parse errors of the incoming data stream.
Configure As A Source
To create and configure a connector for Kafka, sign in to the Decodable Web Console, navigate to the Connections tab, click on New Connection
, and follow the steps below. For examples of using the command line tools or scripting, see the How To guides.
-
Select a connector type, either
source
orsink
, which will determine whether data is being streamed into Decodable for processing by one or more pipelines, or streamed out to an external system. -
Provide a list of one or more Kafka brokers to use to bootstrap connections. Brokers are specified as hostname/port pair, separated by a colon (e.g., broker-hostname-a:9092). Multiple brokers must be separated by commas with no spaces.
-
Provide a semicolon-separated list of one or more topic names to read data from when the connection type is
source
, or to write data to when the connection type issink
. -
Select a broker security protocol, which can be one of the following:
PLAINTEXT
,TLS
,SASL_SSL
, orSASL_PLAINTEXT
. For more information about Kafka security, see the reference section above.- When using TLS, you must also provide:
- the broker's public certificate, which is used to encrypt traffic from Decodable to external Kafka brokers
- the client certificate type used to authenticate Kafka clients in Decodable, which can be one of the following:
none
,SELF_SIGNED
, orCSR
- For mutual TLS authentication, specifying the type is required
- When using SASL_SSL or SASL_PLAINTEXT, you must also provide:
- the SASL mechanism, which can be one of the following:
PLAIN
,SCRAM-SHA-256
, orSCRAM-SHA-512
- the SASL username
- the SASL password
- the SASL mechanism, which can be one of the following:
- When using TLS, you must also provide:
-
Select a data format used to deserialize and serialize the keys and values, which can be one of the following:
JSON
, the JSON format allows to read and write JSON data that is based on a JSON schema.Raw
, the Raw format allows to read and write raw (byte based) values as a single column.Avro
, the Apache Avro format allows to read and write Avro data based on an Avro schema.Debezium (JSON)
, Debezium provides a unified format schema for changelog and supports serializing messages using JSON.
-
Select a policy for handling record parse errors, which can be either
FAIL
orIGNORE
. When set to FAIL, the connection will change from theRunning
state to theStopped
state if any validation errors are encountered in the incoming data stream. When set to IGNORE, all validated records will be written, while invalid records will be ignored without stopping the connection or causing any errors/warnings.
For more detailed information about creating and configuring an Apache Kafka cluster, see the Kafka Quickstart guide and related documentation.
Configure As A Sink
To create a Kafka connector for use as a sink
, all the configuration steps above apply, with the exception of the parse error policy, which is not defined on the outbound end of a data stream.
Reference
Connector name | kafka |
Type | source , sink |
Delivery guarantee | at least once or exactly once, based on configuration |
Compatibility | most Kafka 2.x and 3.x-compatible broker clusters including: |
Additional Kafka Support
If your version of Kafka is not currently supported, please contact [email protected] or join our Slack community and let us know!
Security
Kafka connections may optionally enable SSL transport encryption, which encrypts all network traffic between Decodable and the brokers. In addition, the Kafka connector supports mutual TLS (mTLS) which acts as a bidirectional mode of authentication with the brokers. To configure an mTLS connection, Decodable either provides a self-signed certificate or a certificate signing request (CSR) which you can sign to create a certificate for use with your brokers. See the How To guides for more information.
SSL really means TLS
The Kafka connector (and Apache Kafka) actually implements TLS which supersedes the SSL protocol. For historical reasons, Kafka calls this SSL rather than TLS even though the implementation is really TLS. We do the same in order to avoid confusion for experienced Kafka users. That said, for the security-minded audience, it's really TLS!
Properties
The following properties are supported by the Kafka connector.
Property | Disposition | Description |
---|---|---|
bootstrap.servers | required | A comma-separated list of Kafka brokers to use to bootstrap connections. Each broker must be a colon-separated (: ) hostname/port pair. The list must not include any spaces. Example: broker-a:9092,broker-b:9092 . Kafka brokers must be able to receive connections from Decodable's network blocks. |
topic | required | A semicolon separated list of topic names to read data from when connection type is source or write data to when connection type is sink. |
format | optional; if not specified, value.format must be used | Specifies a data format used to deserialize and serialize the value. Supported formats are ["raw", "json", "avro", "confluent-avro"]. |
value.format | optional; if not specified, format must be used | An alias for format . |
key.format | optional | Specifies a data format used to deserialize and serialize the key. Supported formats are ["raw", "json", "avro", "confluent-avro"]. |
security.protocol | optional | Specifies the protocol used to communicate with brokers. Valid values are: TLS, SASL_PLAINTEXT, SASL_SSL. If not specified, data will transfer in plain text. |
SASL Properties
When configuring a Kafka connection to use SASL Authentication, the following connection properties are used:
Property | Disposition | Description |
---|---|---|
sasl.mechanism | required | Specify the SASL mechanism as configured by the Kafka broker. Support values are ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"]. |
sasl.username | required | The username or API key for authentication. |
sasl.password | required | The password or API secret key for authentication. |
TLS Properties
When configuring a Kafka connection to use TLS or mTLS, the following connection properties are used:
Property | Disposition | Description |
---|---|---|
tls.broker.certificate | required | Broker's public certificate used to encrypt traffic from Decodable to external Kafka brokers. The property is only used when security.protocol=TLS. |
tls.client.certificate.type | required | Specifies the client certificate type used to authenticate Kafka clients in Decodable. This property is required when configuring mTLS. Valid values are ["CSR", "Self-Signed"]. The property is only used when security.protocol=TLS. |
tls.broker.signed_client_certificate | required | Specifies the client certificate signed by the broker. This property is required when tls.client.certificate.type=CSR . |
Avro
A popular choice for data serialization on Kafka is Apache Avro . Decodable supports both plain Avro, as well as Avro with a schema registry.
Plain Avro
In order to use plain Avro, the connection property format=avro
is required. In this mode, the Avro schema is generated from the connection schema, and is available on the inferred_schema
connection property. For example:
decodable connection get 69caa7e1
avro_test_source
id 69caa7e1
description A Kafka source connection with plain Avro schema
connector kafka
type source
stream id 865e8555
schema
0 field1 INT NOT NULL
1 field2 DOUBLE NOT NULL
2 field3 STRING NOT NULL
properties
bootstrap.servers kafka-kafka-brokers.kafka.svc.cluster.local:9092
format avro
inferred_schema {"type":"record","name":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"double"},{"name":"field3","type":"string"}]}
topic avro_topic
target state RUNNING
actual state RUNNING
create time 2022-02-04T19:05:19Z
update time 2022-02-08T17:04:46Z
Avro with Schema Registry
To use Avro with a schema registry, the connection property format=confluent-avro
is required. In this mode, the Avro schema is still derived from the connection schema, but it is validated and registered against the configured schema registry.
When using this mode, the following properties are used:
Property | Disposition | Description |
---|---|---|
avro-confluent.schema-registry.url | required | The URL of the schema registry to use. |
avro-confluent.basic-auth.credentials-source | required | The source of the authentication credentials to use, if required. Allowed values are ["USER_INFO", "SASL_INHERIT"]. Typically USER_INFO is used most often when schema registry authentication is enabled. |
avro-confluent.basic-auth.user-info | required | The authentication credentials to use. They must be specified in the form of username:password . Note that the credentials are kept secret, once set, and will not be visible in the connection properties output. |
For example, you can create a source Kafka connection using both SASL Authentication, as well as a secured schema registry using:
decodable connection create \
--name avro_registry_source \
--description "A Kafka source connection with SASL auth and a schema registry" \
--type source \
--stream-id 295e2a7f \
--connector kafka \
--prop format="avro-confluent" \
--prop topic="my-topic" \
--prop bootstrap.servers="some.broker.cloud:9092" \
--prop security.protocol=SASL_SSL \
--prop sasl.username="my-api-key" \
--prop sasl.password="my-secret-key" \
--prop sasl.mechanism="PLAIN" \
--prop avro-confluent.schema-registry.url="https://my.schema.registry.cloud" \
--prop avro-confluent.basic-auth.user-info="my-registry-user:my-registry-password" \
--prop avro-confluent.basic-auth.credentials-source="USER_INFO "
Metadata fields
Apache Kafka source Connections have access to the following metadata fields. See Connection - Data types - Metadata fields for instructions on their use.
Key | Data Type | Description |
---|---|---|
topic | STRING NOT NULL | Topic name of the Kafka record. |
partition | INT NOT NULL | Partition ID of the Kafka record. |
headers | MAP<string, bytes> NOT NULL | Headers of the Kafka record as a map of raw bytes. |
leader-epoch | INT NULL | Leader epoch of the Kafka record if available. |
offset | BIGINT NOT NULL | Offset of the Kafka record in the partition. |
timestamp | TIMESTAMP_LTZ(3) NOT NULL | Timestamp of the Kafka record. |
timestamp-type | STRING NOT NULL | Timestamp type of the Kafka record. One of:
|
Updated 2 months ago