Apache Kafka

Decodable supports both sourcing data from, and sinking data to, Apache Kafka, which is an open-source distributed event streaming platform optimized for ingesting and processing streaming data in real-time. Kafka provides three main functions to its users:

  • Publish and subscribe to streams of records
  • Effectively store streams of records in the order in which records were generated
  • Process streams of records in real time

Kafka is primarily used to build real-time streaming data pipelines and applications that adapt to the data streams. It combines messaging, storage, and stream processing to allow storage and analysis of both historical and real-time data.

Getting Started

Connections come in two flavors: source and sink. Source connections read from an external system and write to a Decodable stream, while sink connections read from a stream and write to an external system. Kafka connectors can be used in either role, performing as either a source or a sink. The configuration is almost identical, except that source connectors allow for defining a policy for handling parse errors of the incoming data stream.

Configure As A Source

To create and configure a connector for Kafka, sign in to the Decodable Web Console, navigate to the Connections tab, click on New Connection, and follow the steps below. For examples of using the command line tools or scripting, see the How To guides.

  1. Select a connector type, either source or sink, which will determine whether data is being streamed into Decodable for processing by one or more pipelines, or streamed out to an external system.

  2. Provide a list of one or more Kafka brokers to use to bootstrap connections. Brokers are specified as hostname/port pair, separated by a colon (e.g., broker-hostname-a:9092). Multiple brokers must be separated by commas with no spaces.

  3. Provide a semicolon-separated list of one or more topic names to read data from when the connection type is source, or to write data to when the connection type is sink.

  4. Select a broker security protocol, which can be one of the following: PLAINTEXT, TLS, SASL_SSL, or SASL_PLAINTEXT. For more information about Kafka security, see the reference section above.

    • When using TLS, you must also provide:
      • the broker's public certificate, which is used to encrypt traffic from Decodable to external Kafka brokers
      • the client certificate type used to authenticate Kafka clients in Decodable, which can be one of the following: none, SELF_SIGNED, or CSR
      • For mutual TLS authentication, specifying the type is required
    • When using SASL_SSL or SASL_PLAINTEXT, you must also provide:
      • the SASL mechanism, which can be one of the following: PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512
      • the SASL username
      • the SASL password
  5. Select a data format used to deserialize and serialize the keys and values, which can be one of the following:

    • JSON, the JSON format allows to read and write JSON data that is based on a JSON schema.
    • Raw, the Raw format allows to read and write raw (byte based) values as a single column.
    • Avro, the Apache Avro format allows to read and write Avro data based on an Avro schema.
    • Debezium (JSON), Debezium provides a unified format schema for changelog and supports serializing messages using JSON.
  6. Select a policy for handling record parse errors, which can be either FAIL or IGNORE. When set to FAIL, the connection will change from the Running state to the Stopped state if any validation errors are encountered in the incoming data stream. When set to IGNORE, all validated records will be written, while invalid records will be ignored without stopping the connection or causing any errors/warnings.

For more detailed information about creating and configuring an Apache Kafka cluster, see the Kafka Quickstart guide and related documentation.

Configure As A Sink

To create a Kafka connector for use as a sink, all the configuration steps above apply, with the exception of the parse error policy, which is not defined on the outbound end of a data stream.

Reference

Connector namekafka
Typesource, sink
Delivery guaranteeat least once or exactly once, based on configuration
Compatibilitymost Kafka 2.x and 3.x-compatible broker clusters including:

Additional Kafka Support
If your version of Kafka is not currently supported, please contact [email protected] or join our Slack community and let us know!

Security

Kafka connections may optionally enable SSL transport encryption, which encrypts all network traffic between Decodable and the brokers. In addition, the Kafka connector supports mutual TLS (mTLS) which acts as a bidirectional mode of authentication with the brokers. To configure an mTLS connection, Decodable either provides a self-signed certificate or a certificate signing request (CSR) which you can sign to create a certificate for use with your brokers. See the How To guides for more information.

SSL really means TLS
The Kafka connector (and Apache Kafka) actually implements TLS which supersedes the SSL protocol. For historical reasons, Kafka calls this SSL rather than TLS even though the implementation is really TLS. We do the same in order to avoid confusion for experienced Kafka users. That said, for the security-minded audience, it's really TLS!

Properties

The following properties are supported by the Kafka connector.

PropertyDispositionDescription
bootstrap.serversrequiredA comma-separated list of Kafka brokers to use to bootstrap connections. Each broker must be a colon-separated (:) hostname/port pair. The list must not include any spaces. Example: broker-a:9092,broker-b:9092. Kafka brokers must be able to receive connections from Decodable's network blocks.
topicrequiredA semicolon separated list of topic names to read data from when connection type is source or write data to when connection type is sink.
formatoptional; if not specified, value.format must be usedSpecifies a data format used to deserialize and serialize the value. Supported formats are ["raw", "json", "avro", "confluent-avro"].
value.formatoptional; if not specified, format must be usedAn alias for format.
key.formatoptionalSpecifies a data format used to deserialize and serialize the key. Supported formats are ["raw", "json", "avro", "confluent-avro"].
security.protocoloptionalSpecifies the protocol used to communicate with brokers. Valid values are: TLS, SASL_PLAINTEXT, SASL_SSL. If not specified, data will transfer in plain text.

SASL Properties

When configuring a Kafka connection to use SASL Authentication, the following connection properties are used:

PropertyDispositionDescription
sasl.mechanismrequiredSpecify the SASL mechanism as configured by the Kafka broker. Support values are ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"].
sasl.usernamerequiredThe username or API key for authentication.
sasl.passwordrequiredThe password or API secret key for authentication.

TLS Properties

When configuring a Kafka connection to use TLS or mTLS, the following connection properties are used:

PropertyDispositionDescription
tls.broker.certificaterequiredBroker's public certificate used to encrypt traffic from Decodable to external Kafka brokers. The property is only used when security.protocol=TLS.
tls.client.certificate.typerequiredSpecifies the client certificate type used to authenticate Kafka clients in Decodable. This property is required when configuring mTLS. Valid values are ["CSR", "Self-Signed"]. The property is only used when security.protocol=TLS.
tls.broker.signed_client_certificaterequiredSpecifies the client certificate signed by the broker. This property is required when tls.client.certificate.type=CSR.

Avro

A popular choice for data serialization on Kafka is Apache Avro . Decodable supports both plain Avro, as well as Avro with a schema registry.

Plain Avro

In order to use plain Avro, the connection property format=avro is required. In this mode, the Avro schema is generated from the connection schema, and is available on the inferred_schema connection property. For example:

decodable connection get 69caa7e1
avro_test_source
 id 69caa7e1
 description A Kafka source connection with plain Avro schema
 connector kafka
 type source
 stream id 865e8555
 schema
 0 field1 INT NOT NULL
 1 field2 DOUBLE NOT NULL
 2 field3 STRING NOT NULL
 properties
 bootstrap.servers kafka-kafka-brokers.kafka.svc.cluster.local:9092
 format avro
 inferred_schema {"type":"record","name":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"double"},{"name":"field3","type":"string"}]}
 topic avro_topic
 target state RUNNING
 actual state RUNNING
 create time 2022-02-04T19:05:19Z
 update time 2022-02-08T17:04:46Z

Avro with Schema Registry

To use Avro with a schema registry, the connection property format=confluent-avro is required. In this mode, the Avro schema is still derived from the connection schema, but it is validated and registered against the configured schema registry.

When using this mode, the following properties are used:

PropertyDispositionDescription
avro-confluent.schema-registry.urlrequiredThe URL of the schema registry to use.
avro-confluent.basic-auth.credentials-sourcerequiredThe source of the authentication credentials to use, if required. Allowed values are ["USER_INFO", "SASL_INHERIT"]. Typically USER_INFO is used most often when schema registry authentication is enabled.
avro-confluent.basic-auth.user-inforequiredThe authentication credentials to use. They must be specified in the form of username:password. Note that the credentials are kept secret, once set, and will not be visible in the connection properties output.

For example, you can create a source Kafka connection using both SASL Authentication, as well as a secured schema registry using:

decodable connection create \
--name avro_registry_source \
--description "A Kafka source connection with SASL auth and a schema registry" \
--type source \
--stream-id 295e2a7f \
--connector kafka \
--prop format="avro-confluent" \
--prop topic="my-topic" \
--prop bootstrap.servers="some.broker.cloud:9092" \
--prop security.protocol=SASL_SSL \
--prop sasl.username="my-api-key" \
--prop sasl.password="my-secret-key" \
--prop sasl.mechanism="PLAIN" \
--prop avro-confluent.schema-registry.url="https://my.schema.registry.cloud" \
--prop avro-confluent.basic-auth.user-info="my-registry-user:my-registry-password" \
--prop avro-confluent.basic-auth.credentials-source="USER_INFO "

Apache Kafka, Kafka®, Apache® and associated open source project names are either registered trademarks or trademarks of The Apache Software Foundation.

Did this page help you?