Confluent Cloud source connector Features Connector name confluent-cloud Delivery guarantee Exactly once Supported task sizes S, M, L Multiplex capability A single instance of this connector can read from a single topic. Supported stream types Append stream Configuration properties Property Description Required Default Connection cluster.id The ID of your Confluent cluster. Yes cluster.api.endpoint The API endpoint for your Confluent cluster. Yes cluster.api.key The API key for your Confluent cluster. Yes cluster.api.secret The API secret for your Confluent cluster This must be provided as a secret resource. Yes Data topic The name of the topic from which to read data. Yes key.format The format for the key, if present, in the message. Must be one of: avro avro-confluent (see below for further properties) json debezium-json raw See Serialization formats for more details. — key.fields A list of the key’s field names, delimited by semicolons, that comprise the message key. For nested fields specify the root key only, and the nested fields as part of the connection’s schema. See Message key for more details and an example of using message keys. — value.format The format for the value payload in the message. Must be one of: avro avro-confluent (see below for further properties) json debezium-json debezium-avro-confluent raw See Serialization formats for more details. — value.fields-include Which fields to include in the payload. ALL: all fields are written, including the fields from the key EXCEPT_KEY: all fields from the value of the message are written, but not those from the key — ALL Format: Avro confluent-registry.url The URL of the schema registry to use. Yes confluent-registry.api-key The API key for the Schema Registry Yes confluent-registry.api-secret The API secret for the Schema Registry This must be provided as a secret resource. Yes Advanced parse-error-policy Select the error handling policy. Must be one of the following: FAIL: When set to FAIL, Decodable stops the connection if any validation errors are encountered in the incoming data stream. IGNORE: When set to IGNORE, Decodable ignores all invalid records. All validated records are sent. With this policy, the connection isn’t stopped nor will any errors or warnings be shown if there is an invalid record. — scan.startup.mode Specifies where to start reading data when the connection is first started, or when it’s restarted with the state discarded. Must be one of the following: group-offsets: Start reading data from a specified group consumer. earliest-offset: Start reading data from the earliest available point in the stream. latest-offset: Start reading data from the latest available point in the stream. timestamp: Start reading data from a specific timestamp. specific-offset: Start reading data from a specific offset value. — latest-offset properties.group.id The group ID used by the consumer. Only required if scan.startup.mode is set to group-offsets. properties.auto.offset.reset What to do when there is no initial offset in Confluent Cloud (like a new consumer group) or if the specified offset no longer exists on the server (because of data deletion). — Serialization formats The following formats are supported: Format Description json JSON data raw Raw (byte based) values as a single column. avro Plain Avro data using the schema defined when you create the connection avro-confluent Avro data using a predefined schema store in a schema registry. debezium-json A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option or debezium-avro-confluent if you want to send Change Data Capture (CDC) data through this connector. debezium-avro-confluent A unified format schema for changelogs with additional support for serializing messages using Avro with the schema held in a schema registry. Select this option or debezium-json if you want to send Change Data Capture (CDC) data through this connector. Avro A popular choice for data serialization on Confluent Cloud is Apache Avro. Decodable supports both plain Avro, as well as Avro with a schema registry. In order to use plain Avro (without the schema held in a Schema Registry) the Avro schema is generated from the connection schema, and is available on the inferred_schema connection property. For example: $ decodable connection get 2428ebe9 confluent_cloud_source id 2428ebe9 description - connector confluent-cloud type source stream id 0dc6bcbc fields 0 request_id STRING 1 user_id INT 2 ts BIGINT 3 endpoint STRING primary key fields - properties cluster.api.endpoint https://example.us-west-2.aws.confluent.cloud:443 cluster.api.key NF2GERAAFNABCDEF cluster.api.secret d5d7a670 cluster.id lkc-98765 format avro inferred_schema {"type":"record","name":"record","fields":[{"name":"request_id","type":["null","string"],"default":null},{"name":"user_id","type":["null","int"],"default":null},{"name":"ts","type":["null","long"],"default":null},{"name":"endpoint","type":["null","string"],"default":null}]} topic clickstream_topic […] Avro with Schema Registry In this mode, the Avro schema is still derived from the connection schema, but it’s validated and registered against the configured schema registry. To use Avro with a Schema Registry such as Confluent’s, additional connection properties are required. These can be specified when you create the connection, if using the CLI or API. If you create the connection through the UI you will need to use the CLI/API to update the connection to add the necessary properties. For example, to create a source Confluent Cloud connection reading Avro messages using a secured schema registry using the CLI: $ decodable connection create --connector confluent-cloud --type source \ --name avro_registry_source \ --description "An example connection with schema registry validation" \ --field value=string \ --prop format="avro-confluent" \ --prop topic="my-topic" \ --prop cluster.api.endpoint="https://example.us-west-2.aws.confluent.cloud:443" \ --prop cluster.api.key="ABCDERAAFNJDHZL" \ --prop cluster.api.secret="605079f4" \ --prop cluster.id="lkc-54321" \ --prop confluent-registry.url="https://example-registry.us-east-2.aws.confluent.cloud" \ --prop confluent-registry.api-key="ABCDJ6B2WQMHXL7K" \ --prop confluent-registry.api-secret="5b17d53f" Connector starting state and offsets When you create a connection, or restart it and discard state, it will read from the position in the topic as defined by scan startup mode. By default this is latest-offset and will therefore read from the end of the topic. Learn more about starting state here. How to connect to your Confluent Cloud cluster The cluster id and API endpoint can be found on the Cluster overview -> Cluster settings page, in the Identification box. API keys and secrets can be created in Confluent Cloud by clicking Data Integration -> API keys. Here you can see any API keys you’ve already created or create a new one with the "Add key" button. Make sure you save the API key and secret pair somewhere you can find it since you won’t be able to get the secret again through the UI Click "Next" Choose a topic Now select the topic you want to connect If you’re using a newly created API key, you may not be seeing any topics yet because it takes a minute or two for Confluent to propagate the new key through the system. Try waiting a bit and clicking "Refresh Topics" Select a stream to connect to Either select the existing Decodable stream this connection will be connected to or click "New Stream" to create a new stream for the connection If you’re connecting to an existing stream, you can verify the schema and rename fields, but not add or remove any fields If you’re creating a new stream you have three options for defining your schema Manually define a new schema by adding fields and giving them types Select "Structured Schema Definition" from the drop down to upload a schema file that you already have. Select the appropriate type of schema from the "Schema Type" dropdown, then type or paste the schema definition into the "Schema" text box. Select "Schema Registry" to connect to your Confluent Cloud schema registry and get the schema from there You can get the required information by clicking "Schema Registry" in the Confluent Cloud sidebar. Copy the API endpoint from the API endpoint box API key/secret pairs can be created in the API credentials box. Just like for the cluster, make sure to save these because you won’t be able to access the secrets from the UI again. Message key To read the data held in the message key you need to do the following: Specify the format of the data in the key with key.format. Specify the name of the fields in the key with key.fields. Set value.fields-include=EXCEPT_KEY. If you are using the Decodable web interface to create the connection, you will need to use the CLI (as shown below) or API after creating the connection to update it with this property. Add the key fields and their data types to the connection schema. Do not set these as primary key in the schema. Example This example is based on a Confluent Cloud topic in which the key looks like this: { "tenant": { "id": "acme-corp", "region": "us-west-2" } } and the value like this: { "product_id": 1, "order_ts": 1534772501276, "total_amount": 10.50 } CLI To create a connection directly based on the above message key and value with the CLI use: decodable connection create \ --name orders \ --type source \ --connector confluent-cloud \ --prop bootstrap.servers=broker:port \ --prop value.format=json \ --prop key.fields=tenant \ --prop key.format=json \ --prop parse-error-policy=FAIL \ --prop properties.auto.offset.reset=none \ --prop scan.startup.mode=earliest-offset \ --prop topic=orders \ --prop value.fields-include=EXCEPT_KEY \ --field tenant="ROW<id STRING, region STRING>" \ --field product_id="INT" \ --field order_ts="BIGINT" \ --field total_amount="FLOAT" Web app To configure a connection in the Decodable web app do the following: Create a connection to your Confluent Cloud broker including the following settings: Key Format JSON Key Fields tenant Value Format JSON Define the connection schema. This must include the field from the key (tenant). Do not mark it as a primary key field. Save the connection, but don’t activate it yet. Make a note of your connection id. Using the Decodable CLI update your connection (using the id from the previous step) to add the value.fields-include property decodable connection update 553feb9e --prop value.fields-include=EXCEPT_KEY Activate your connection as normal The data from the Confluent Cloud message key will be included in the stream: