Apache Kafka source connector Use the Apache Kafka® connector to get data from a Kafka topic into Decodable. If you are looking for information about how to create a connection to get data into Kafka from Decodable, see Apache Kafka. Features Delivery guarantee at least once Compatibility Most Kafka 2.x and 3.x-compatible broker clusters including: Apache Kafka Amazon Managed Service for Apache Kafka (MSK) Confluent Platform Confluent Cloud Create a connection to Apache Kafka If you want to use the Decodable CLI or API to create the connection, you can refer to the Property Name column for information about what the underlying property names are. The connector name is kafka. From the Connections page, select the Apache Kafka connector and complete the following fields. UI Field Property Name Description Connection Type N/A Select source to use this connector to get data into Decodable. Bootstrap Servers bootstrap.servers A comma-separated list of your Kafka brokers. You must enter at least one broker. Enter each broker in the format: <host>:<port>. The broker list can’t contain any spaces. Topic topic The name of the topic to get data from. Broker Security Protocol security.protocol Specify the security protocol to use when connecting to a broker. Must be one of the following: PLAINTEXT TLS SASL_SSL SASL_PLAINTEXT See the Security protocols section for more information about these protocols and for the additional fields that you’ll need to fill out if you want to secure your connection with TLS or SASL Authentication. Key Format key.format The format for the key, if present, in the Kafka message. Key Fields key.fields A list of the key’s field names, delimited by semicolons, that comprise the message key. For nested fields specify the root key only, and the nested fields as part of the connection’s schema. See Message Key for more details and an example of using Kafka message keys. Value Format value.format The format for the value payload in the Kafka message. Parse Error Policy parse-error-policy Select the error handling policy. Must be one of the following: FAIL: When set to FAIL, Decodable stops the connection if any validation errors are encountered in the incoming data stream. IGNORE: When set to IGNORE, Decodable ignores all invalid records. All validated records are sent. With this policy, the connection isn’t stopped nor will any errors or warnings be shown if there is an invalid record. Scan Startup Mode scan.startup.mode Specifies where to start reading data when the connection is first started, or when it’s restarted with the state discarded. Must be one of the following: group-offsets: Start reading data from a specified group consumer. earliest-offset: Start reading data from the earliest available point in the stream. latest-offset (default): Start reading data from the latest available point in the stream. timestamp: Start reading data from a specific timestamp. specific-offset: Start reading data from a specific offset value. Consumer Group ID properties.group.id The group ID used by the consumer. Optional, unless the Scan Startup Mode is set to group-offsets. Consumer Offset Reset Strategy properties.auto.offset.reset What to do when there is no initial offset in Kafka (like a new consumer group) or if the specified offset no longer exists on the server (because of data deletion). Select the stream that you’d like to connect to this connector. This will be the stream that receives events from Apache Kafka. Then, select Next. Define the connection’s schema. Select New Schema to manually enter the fields and field types present or Import Schema if you want to paste the schema in Avro or JSON format. The stream’s schema must match the schema of the data that you plan on sending through this connection. For more information about creating a stream or defining the stream schema, see Create and manage Streams. Select Next when you are finished providing defining the connection’s schema. Give the newly created connection a Name and Description and select Save. Once ready to begin ingestion, Activate the connection by clicking on Start. Message Key To read the data held in the message key you need to do the following: Specify the format of the data in the key with key.format. Specify the name of the fields in the key with key.fields. Add the key fields and their data types to the connection schema. Do not set these as primary key in the schema. Once the connection is created, add the value.fields-include=EXCEPT_KEY property configuration to it. This can only be done through the CLI (as shown below) or API. Example This example is based on a Kafka topic in which the key looks like this: { "tenant": { "id": "acme-corp", "region": "us-west-2" } } and the value like this: { "product_id": 1, "order_ts": 1534772501276, "total_amount": 10.50 } To configure a connection in the UI you would do the following: Create a connection to your Kafka broker per the steps given above including the following settings: Key Format JSON Key Fields tenant Value Format JSON Define the connection schema. This must including the field from the key (tenant). Do not mark it as a primary key field. Save the connection, but don’t activate it yet. Make a note of your connection id. Using the Decodable CLI update your connection (using the id from the previous step) to add the value.fields-include property decodable connection update 553feb9e --prop value.fields-include=EXCEPT_KEY Activate your connection as normal The data from the Kafka message key will be included in the stream: If you want to create the connection directly the syntax equivalent to the above connection creation and update would be: decodable connection create \ --name orders \ --type source \ --connector kafka \ --prop bootstrap.servers=broker:port \ --prop value.format=json \ --prop key.fields=tenant \ --prop key.format=json \ --prop parse-error-policy=FAIL \ --prop properties.auto.offset.reset=none \ --prop scan.startup.mode=earliest-offset \ --prop topic=orders \ --prop value.fields-include=EXCEPT_KEY \ --field tenant="ROW<id STRING, region STRING>" \ --field product_id="INT" \ --field order_ts="BIGINT" \ --field total_amount="FLOAT" Message Metadata When consuming records from Apache Kafka in Decodable, you can access specific metadata fields associated with each Kafka record. A metadata field is identified by a string-based key and an associated data type. For example, the Apache Kafka connector has a metadata field with the key timestamp and data type TIMESTAMP_LTZ(3). See available metadata for a list of metadata fields that you have access to. A metadata field has its type formed as: {datatype} METADATA [FROM '{key}'] A metadata field is indicated by the METADATA keyword, with an optional FROM to provide the key. If the key isn’t provided explicitly with FROM, it defaults to the name of the field. For example: TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' Available metadata When you connect to Apache Kafka as a data source, you have access to the following metadata fields. Key Data Type Description topic STRING NOT NULL Topic name of the Kafka record. partition INT NOT NULL Partition ID of the Kafka record. headers MAP<string, bytes> NOT NULL Headers of the Kafka record as a map of raw bytes. leader-epoch INT NULL Leader epoch of the Kafka record if available. offset BIGINT NOT NULL Offset of the Kafka record in the partition. timestamp TIMESTAMP_LTZ(3) NOT NULL Timestamp of the Kafka record. timestamp-type STRING NOT NULL Timestamp type of the Kafka record. Valid values are: NoTimestampType, CreateTime, or LogAppendTime. Serialization formats The following formats are supported for the deserialization of message keys and/or values: Format UI name Description json JSON Read and write JSON data based on a JSON schema. raw Raw Read and write raw (byte based) values as a single column. avro Avro Read and write Avro data based on an Avro schema. confluent-avro N/A - configure through CLI/API Read and write Avro data with a schema registry. debezium-json Debezium (JSON) A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option if you want to send CDC data through this connector. Avro A popular choice for data serialization on Kafka is Apache Avro. Decodable supports both plain Avro, as well as Avro with a schema registry. In order to use plain Avro (that’s, without the schema held in a Schema Registry) the Avro schema is generated from the connection schema, and is available on the inferred_schema connection property. For example: decodable connection get 69caa7e1 avro_test_source id 69caa7e1 description A Kafka source connection with plain Avro schema connector kafka type source stream id 865e8555 schema 0 field1 INT NOT NULL 1 field2 DOUBLE NOT NULL 2 field3 STRING NOT NULL properties bootstrap.servers kafka-kafka-brokers.kafka.svc.cluster.local:9092 format avro inferred_schema {"type":"record","name":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"double"},{"name":"field3","type":"string"}]} topic avro_topic target state RUNNING actual state RUNNING create time 2022-02-04T19:05:19Z update time 2022-02-08T17:04:46Z Avro with Schema Registry In this mode, the Avro schema is still derived from the connection schema, but it’s validated and registered against the configured schema registry. To use Avro with a Schema Registry such as Confluent’s, additional connection properties are required as listed below. These can be specified when you create the connection, if using the CLI or API. If you create the connection through the UI you will need to use the CLI/API to update the connection to add this property. When using this mode, the following properties are used: Property Required? Description avro-confluent.schema-registry.url required The URL of the schema registry to use. avro-confluent.basic-auth.credentials-source optional The source of the authentication credentials to use, if required. Allowed values are USER_INFO and SASL_INHERIT. Typically USER_INFO is used most often when schema registry authentication is enabled. avro-confluent.basic-auth.user-info optional The authentication credentials to use, if credentials are to be used. They must be specified in the form of username:password. Note that the credentials are kept secret, once set, and won’t be visible in the connection properties output. avro-confluent.long-schema-id optional Whether to parse the schema ID as an integer or a long. If set to true, this will allow for reading and writing Apicurio-style payloads rather than the Confluent default. Defaults to false For example, you can create a source Kafka connection using both SASL Authentication, as well as a secured schema registry using: decodable connection create \ --name avro_registry_source \ --description "A Kafka source connection with SASL auth and a schema registry" \ --type source \ --stream-id 295e2a7f \ --connector kafka \ --prop format="avro-confluent" \ --prop topic="my-topic" \ --prop bootstrap.servers="some.broker.cloud:9092" \ --prop security.protocol=SASL_SSL \ --prop sasl.username="my-api-key" \ --prop sasl.password="my-secret-key" \ --prop sasl.mechanism="PLAIN" \ --prop avro-confluent.schema-registry.url="https://my.schema.registry.cloud" \ --prop avro-confluent.basic-auth.user-info="my-registry-user:my-registry-password" \ --prop avro-confluent.basic-auth.credentials-source=USER_INFO Security protocols Decodable supports four different security protocols for accessing Kafka brokers. The Kafka connector (and Apache Kafka) actually implements TLS which supersedes the SSL protocol. For historical reasons, Kafka calls this SSL rather than TLS even though the implementation is TLS. We do the same in order to avoid confusion for experienced Kafka users. That said, for the security-minded audience, it’s TLS! Security protocol Description SASL-authenticated Username and password authentication is used with SASL. Both SSL/TLS and PLAINTEXT encryption is supported, as well as SCRAM and PLAIN authentication mechanisms. mTLS-authenticated Two-way SSL authentication is used, so that Decodable and the Kafka brokers authenticate each other using the SSL protocol. Additionally, the connection is encrypted using SSL. TLS-authenticated One-way SSL authentication is used. The client (Decodable) holds the server’s (Kafka brokers) public certificate. Data from the Kafka brokers is encrypted using the server’s private key and Decodable can decrypt it using the public certificate. Data from Decodable is encrypted using the public certificate and can only be decrypted using the Kafka broker’s private key. Unauthenticated No authentication takes place between Decodable and the Kafka brokers. The connection isn’t encrypted. See the following pages for specific instructions on how to create a connection to Apache Kafka using the various security protocols: How to set up SASL Authentication with Kafka. How to set up Mutual TLS Authentication with Kafka. How to set up TLS Encryption with Kafka. SASL properties When configuring a Kafka connection to use SASL-authentication, the following connection properties are used. UI Field Property Name Description SASL Mechanism sasl.mechanism Specify the SASL mechanism as configured by the Kafka broker. Valid values are: PLAIN SCRAM-SHA-256 SCRAM-SHA-512 SASL Username sasl.username The username or API key for authentication. SASL Password sasl.password The secret associated with your provided API key. If you are using the Decodable CLI, this is the ID of a secret resource in your account. Run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret. Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Roles, groups, and permissions for more information. TLS properties When configuring a Kafka connection to use TLS or mTLS-authentication, the following connection properties are used. UI Field Property Name Description Broker Certificate tls.broker.certificate The public certificate for the broker used to encrypt traffic to Decodable. TLS Client Certificate Type tls.client.key The secret associated with the client TLS key used by mTLS connections. The key must be an unencrypted key in PKCS#8 format. If you are using the Decodable CLI, this is the ID of the secret resource corresponding to the client TLS key in your account. Run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret. Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Roles, groups, and permissions for more information. N/A tls.broker.signed_client_certificate Specifies the client certificate signed by the broker. This property is required when the tls.client.certificate.type is set to CSR. You must use the Decodable CLI to specify the client certificate. See Option 2: Using Certificate Sign Request in the How To: Set up Mutual TLS Authentication with Kafka section for more information. Connector starting state and offsets When you create a connection, or restart it and discard state, it will read from the position in the topic as defined by scan startup mode. By default this is latest-offset and will therefore read from the end of the topic. Learn more about starting state here.