Confluent Cloud
You can use the Confluent Cloud connector to get data from or send data to a Confluent Cloud cluster. This topic covers how to configure the Confluent Cloud Connector to get data from Confluent Cloud into Decodable. If you are looking for information about how to get data from Decodable into Confluent Cloud, see Confluent Cloud in the Connect to a data destination chapter.
Overview
Connector name | confluent-cloud |
Type | source , sink |
Delivery guarantee | at least once or exactly once, based on configuration |
Steps
Follow these steps to send data from Decodable into Confluent Cloud.
- From the Connections page, select the Confluent Cloud connector and complete the following fields.
UI Field | Property Name in the Decodable CLI | Description |
---|---|---|
Connection Type | N/A | Select source to use this connector to get data into Decodable. |
Cluster ID | cluster.id | Specify the ID of your Confluent cluster. |
Cluster API Endpoint | cluster.api.endpoint | The API endpoint for your Confluent cluster. |
Cluster API Key | cluster.api.key | The API key for your Confluent cluster. |
Cluster API Secret | cluster.api.secret | The secret associated with your provided API key. If you are using the Decodable CLI, this is the ID of a secret resource in your account. Run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret.Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Manage Roles, Groups, and Permissions for more information. |
Topic | topic | Optional. The topic that you want to receive data from. |
Value Format | value.format | The format for data in the Confluent cluster. Must be one of the following. - JSON : Read and write JSON data based on a JSON schema.- RAW : Read and write raw (byte based) values as a single column.- AVRO : Read and write Avro data based on an Avro schema. If you are using the Decodable CLI to create or configure this connection, specify confluent-avro to use Avro with a schema registry. In this mode, the Avro schema that you define is validated and registered against the configured schema registry. Otherwise, specify avro to generate the Avro schema from the connection schema. The generated schema will be shown in the inferred_schema connection property. For more information on the additional properties used by the Decodable CLI for Avro formats, see Avro.- Debezium (JSON) : A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option if you want to send Change Data Capture (CDC) data through this connector. |
Parse error policy | parse-error-policy | Select the error handling policy. Must be one of the following: - FAIL : When set to FAIL, Decodable stops the connection if any validation errors are encountered in the incoming data stream.- IGNORE : When set to IGNORE, Decodable ignores all invalid records. All validated records are sent. With this policy, the connection is not stopped nor will any errors or warnings be shown if there is an invalid record. |
Consumer Group ID | properties.group.id | Optional, unless the Scan Startup Mode was set to group offset . The group ID used by the consumer. |
Scan Startup Mode | scan.startup.mode | The offset mode that Decodable uses to determine where to start consuming data. Must be one of the following.
|
Consumer Offset Reset Strategy | properties.auto.offset.reset | Select the behavior that you want when there is no initial offset in Confluent (e.g. a new consumer group) or if the specified offset does not exist (e.g. that data has been deleted). |
-
Enter the Confluent Cloud topic that you'd like to receive data from. Then, select Next.
-
Select the stream that you’d like to connect to this connector. Then, select Next.
-
Define the connection’s schema. Decodable can auto-populate the connection’s schema using Confluent’s schema registry. In most cases, you’ll want to select Schema Registry to automatically populate the connection’s schema. However, if you would like to manually enter the schema, select New Schema or Structured Schema Definition.
- If you want to auto-populate the connection schema using Confluent’s schema registry, you’ll also need to provide the Schema Registry Endpoint, the Schema Registry API Key, and the Schema Registry API Secret to use to communicate with your schema registry.
-
Select Next when you are finished providing defining the connection’s schema.
-
Give the newly created connection a Name and Description and select Save.
Reference
Avro
A popular choice for data serialization on Kafka is Apache Avro . Decodable supports both plain Avro, as well as Avro with a schema registry.
Plain Avro
In order to use plain Avro, schema
connection property format=avro
is required. In this mode, the Avro schema is generated from the connection schema, and is available on the inferred_schema
connection property. For example:
decodable connection get 57f8ffb6
avro_example_source
id 57f8ffb6
description An example source connection with plain Avro schema
connector confluent-cloud
type source
stream id 3843bb39
fields
0 user_id INT
1 ts BIGINT
2 endpoint STRING
primary key fields -
properties
cluster.api.endpoint https://example.us-west-2.aws.confluent.cloud:443
cluster.api.key ABCDERAAFNJDHZL
cluster.api.secret 605079f4
cluster.id lkc-12345
confluent-registry.api-key ABCDE6B2WQJDHFUP
confluent-registry.api-secret 5b17d53f
confluent-registry.url https://example-registry.us-east-2.aws.confluent.cloud
format {"type":"record","name":"record","fields":[{"name":"user_id","type":["null","int"],"default":null},{"name":"ts","type":["null","long"],"default":null},{"name":"endpoint","type":["null","string"],"default":null}]}
parse-error-policy FAIL
properties.auto.offset.reset none
scan.startup.mode earliest-offset
avro
inferred_schema topic my_example_topic
target state RUNNING
actual state RUNNING
requested tasks 1
actual tasks 0
requested task size M
actual task size -
create time 2023-02-27T19:42:43Z
update time 2023-02-27T19:43:06Z
Avro with Schema Registry
To use Avro with a schema registry, the connection property format=confluent-avro
is required. In this mode, the Avro schema is still derived from the connection schema, but it is validated and registered against the configured schema registry.
When using this mode, the following properties are used.
Property | Required? | Description |
---|---|---|
confluent-registry.url | required | The URL of the schema registry to use. |
confluent-registry.api-key | required | The username or API key to use to use. |
confluent-registry.api-secret | required | The password associated with the username. This must be provided as a secret resource. If you are using the Decodable CLI, run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret.Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Manage Roles, Groups, and Permissions for more information. |
The following command is an example of how to create a connection to Confluent Cloud using SASL Authentication and secured schema registry.
./decodable connection create --connector confluent-cloud --type source \
--name avro_registry_source \
--description "An example connection with schema registry validation" \
--field value=string \
--prop format="avro-confluent" \
--prop topic="my-topic" \
--prop cluster.api.endpoint="https://example.us-west-2.aws.confluent.cloud:443" \
--prop cluster.api.key="ABCDERAAFNJDHZL" \
--prop cluster.api.secret="605079f4" \
--prop cluster.id="lkc-54321" \
--prop confluent-registry.url="https://example-registry.us-east-2.aws.confluent.cloud" \
--prop confluent-registry.api-key="ABCDJ6B2WQMHXL7K" \
--prop confluent-registry.api-secret="5b17d53f"
Metadata fields
When you connect to Confluent Cloud, you have access to Apache Kafka metadata fields. To see a complete list of metadata fields provided and further instructions, see Extract message metadata.
Updated 2 months ago