Confluent Cloud source connector You can use the Confluent Cloud connector to get data from or send data to a Confluent Cloud cluster. This topic covers how to configure the Confluent Cloud Connector to get data from Confluent Cloud into Decodable. If you are looking for information about how to get data from Decodable into Confluent Cloud, see Confluent Cloud in the Connect to a data destination chapter. Features Delivery guarantee At least once or exactly once, based on configuration Steps If you want to use the Decodable CLI or API to create the connection, you can refer to the Property Name column for information about what the underlying property names are. The connector name is confluent-cloud. From the Connections page, select the Confluent Cloud connector and complete the following fields. UI Field Property Name Description Connection Type N/A Select source to use this connector to get data into Decodable. Cluster ID cluster.id Specify the ID of your Confluent cluster. Cluster API Endpoint cluster.api.endpoint The API endpoint for your Confluent cluster. Cluster API Key cluster.api.key The API key for your Confluent cluster. Cluster API Secret cluster.api.secret The secret associated with your provided API key. If you are using the Decodable CLI, this is the ID of a secret resource in your account. Run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret. Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Roles, groups, and permissions for more information. Topic topic Optional. The topic that you want to receive data from. Value Format value.format The format for data in the Confluent cluster. Must be one of the following. JSON: Read and write JSON data based on a JSON schema. RAW: Read and write raw (byte based) values as a single column. AVRO: Read and write Avro data based on an Avro schema. If you are using the Decodable CLI to create or configure this connection, specify confluent-avro to use Avro with a schema registry. In this mode, the Avro schema that you define is validated and registered against the configured schema registry. Otherwise, specify avro to generate the Avro schema from the connection schema. The generated schema will be shown in the inferred_schema connection property. For more information on the additional properties used by the Decodable CLI for Avro formats, see Avro. Debezium (JSON): A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option if you want to send Change Data Capture (CDC) data through this connector. Parse error policy parse-error-policy Select the error handling policy. Must be one of the following: FAIL: When set to FAIL, Decodable stops the connection if any validation errors are encountered in the incoming data stream. IGNORE: When set to IGNORE, Decodable ignores all invalid records. All validated records are sent. With this policy, the connection isn’t stopped nor will any errors or warnings be shown if there is an invalid record. Scan Startup Mode scan.startup.mode Specifies where to start reading data when the connection is first started, or when it’s restarted with the state discarded. Must be one of the following: group-offsets: Start reading data from a specified group consumer. earliest-offset: Start reading data from the earliest available point in the stream. latest-offset (default): Start reading data from the latest available point in the stream. timestamp: Start reading data from a specific timestamp. specific-offset: Start reading data from a specific offset value. Consumer Group ID properties.group.id The group ID used by the consumer. Optional, unless the Scan Startup Mode is set to group-offsets. Consumer Offset Reset Strategy properties.auto.offset.reset Select the behavior that you want when there is no initial offset in Confluent (like a new consumer group) or if the specified offset doesn’t exist (such as data deletion). Enter the Confluent Cloud topic that you’d like to receive data from. Then, select Next. Select the stream that you’d like to connect to this connector. Then, select Next. Define the connection’s schema. Decodable can automatically populate the connection’s schema using Confluent’s schema registry. In most cases, you’ll want to select Schema Registry to automatically populate the connection’s schema. However, if you would like to manually enter the schema, select New Schema or Structured Schema Definition. If you want to automatically populate the connection schema using Confluent’s schema registry, you’ll also need to provide the Schema Registry Endpoint, the Schema Registry API Key, and the Schema Registry API Secret to use to communicate with your schema registry. Select Next when you are finished providing defining the connection’s schema. Give the newly created connection a Name and Description and select Save. Serialization formats: Avro A popular choice for data serialization on Apache Kafka is Apache Avro. Decodable supports both plain Avro, as well as Avro with a schema registry. Plain Avro In order to use plain Avro, schema connection property format=avro is required. In this mode, the Avro schema is generated from the connection schema, and is available on the inferred_schema connection property. For example: decodable connection get 57f8ffb6 avro_example_source id 57f8ffb6 description An example source connection with plain Avro schema connector confluent-cloud type source stream id 3843bb39 fields 0 user_id INT 1 ts BIGINT 2 endpoint STRING primary key fields - properties cluster.api.endpoint https://example.us-west-2.aws.confluent.cloud:443 cluster.api.key ABCDERAAFNJDHZL cluster.api.secret 605079f4 cluster.id lkc-12345 confluent-registry.api-key ABCDE6B2WQJDHFUP confluent-registry.api-secret 5b17d53f confluent-registry.url https://example-registry.us-east-2.aws.confluent.cloud format {"type":"record","name":"record","fields":[{"name":"user_id","type":["null","int"],"default":null},{"name":"ts","type":["null","long"],"default":null},{"name":"endpoint","type":["null","string"],"default":null}]} parse-error-policy FAIL properties.auto.offset.reset none scan.startup.mode earliest-offset avro inferred_schema topic my_example_topic target state RUNNING actual state RUNNING requested tasks 1 actual tasks 0 requested task size M actual task size - create time 2023-02-27T19:42:43Z update time 2023-02-27T19:43:06Z Avro with Schema Registry To use Avro with a schema registry, the connection property format=confluent-avro is required. In this mode, the Avro schema is still derived from the connection schema, but it’s validated and registered against the configured schema registry. When using this mode, the following properties are used. Property Required? Description confluent-registry.url required The URL of the schema registry to use. confluent-registry.api-key required The username or API key to use to use. confluent-registry.api-secret required The password associated with the username. This must be provided as a secret resource. If you are using the Decodable CLI, run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret. Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Roles, groups, and permissions for more information. The following command is an example of how to create a connection to Confluent Cloud using SASL Authentication and secured schema registry. ./decodable connection create --connector confluent-cloud --type source \ --name avro_registry_source \ --description "An example connection with schema registry validation" \ --field value=string \ --prop format="avro-confluent" \ --prop topic="my-topic" \ --prop cluster.api.endpoint="https://example.us-west-2.aws.confluent.cloud:443" \ --prop cluster.api.key="ABCDERAAFNJDHZL" \ --prop cluster.api.secret="605079f4" \ --prop cluster.id="lkc-54321" \ --prop confluent-registry.url="https://example-registry.us-east-2.aws.confluent.cloud" \ --prop confluent-registry.api-key="ABCDJ6B2WQMHXL7K" \ --prop confluent-registry.api-secret="5b17d53f" Metadata fields When you connect to Confluent Cloud, you have access to Apache Kafka metadata fields. To see a complete list of metadata fields provided and further instructions, see message metadata. Connector starting state and offsets When you create a connection, or restart it and discard state, it will read from the position in the topic as defined by scan startup mode. By default this is latest-offset and will therefore read from the end of the topic. Learn more about starting state here.