Confluent Cloud sink connector Features Connector name confluent-cloud Delivery guarantee At least once Supported task sizes S, M, L Multiplex capability A single instance of this connector can write to a single topic. Supported stream types Append stream Configuration properties Property Description Required Default Connection cluster.id The ID of your Confluent cluster. Yes cluster.api.endpoint The API endpoint for your Confluent cluster. Yes cluster.api.key The API key for your Confluent cluster. Yes cluster.api.secret The API secret for your Confluent cluster This must be provided as a secret resource. Yes Data topic The name of the topic to write data to. Yes key.format The format for the key, if present, in the message. Must be one of: avro avro-confluent (see below for further properties) json debezium-json debezium-avro-confluent raw See Serialization formats for more details. Only required when the input streams contain either a primary key or a partition key. key.fields A list of the key’s field names, delimited by semicolons, that comprise the message key. For nested fields specify the root key only, and the nested fields as part of the connection’s schema. — value.format The format for the value payload in the message. Must be one of: avro avro-confluent (see below for further properties) json debezium-json debezium-avro-confluent raw See Serialization formats for more details. — value.fields-include Where primary/partition key fields should appear in the output messages. ALL: the message key and value EXCEPT_KEY: the message key only See the Field Inclusion Policy section for more details on setting this property. — ALL Format: Avro confluent-registry.url The URL of the schema registry to use. Yes confluent-registry.api-key The API key for the Schema Registry Yes confluent-registry.api-secret The API secret for the Schema Registry This must be provided as a secret resource. Yes Serialization formats The following formats are supported: Format Description json JSON data raw Raw (byte based) values as a single column. avro Plain Avro data using the schema defined when you create the connection avro-confluent Avro data using a predefined schema store in a schema registry. debezium-json A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option or debezium-avro-confluent if you want to send Change Data Capture (CDC) data through this connector. debezium-avro-confluent A unified format schema for changelogs with additional support for serializing messages using Avro with the schema held in a schema registry. Select this option or debezium-json if you want to send Change Data Capture (CDC) data through this connector. Avro A popular choice for data serialization on Confluent Cloud is Apache Avro. Decodable supports both plain Avro, as well as Avro with a schema registry. In order to use plain Avro (without the schema held in a Schema Registry) the Avro schema is generated from the connection schema, and is available on the inferred_schema connection property. For example: $ decodable connection get 2428ebe9 confluent_cloud_sink id 2428ebe9 description - connector confluent-cloud type sink stream id 0dc6bcbc fields 0 request_id STRING 1 user_id INT 2 ts BIGINT 3 endpoint STRING primary key fields - properties cluster.api.endpoint https://example.us-west-2.aws.confluent.cloud:443 cluster.api.key NF2GERAAFNABCDEF cluster.api.secret d5d7a670 cluster.id lkc-98765 format avro inferred_schema {"type":"record","name":"record","fields":[{"name":"request_id","type":["null","string"],"default":null},{"name":"user_id","type":["null","int"],"default":null},{"name":"ts","type":["null","long"],"default":null},{"name":"endpoint","type":["null","string"],"default":null}]} topic clickstream_topic […] Avro with Schema Registry In this mode, the Avro schema is still derived from the connection schema, but it’s validated and registered against the configured schema registry. To use Avro with a Schema Registry such as Confluent’s, additional connection properties are required. These can be specified when you create the connection, if using the CLI or API. If you create the connection through the UI you will need to use the CLI/API to update the connection to add the necessary properties. For example, to create a sink Confluent Cloud connection reading Avro messages using a secured schema registry using the CLI: $ decodable connection create --connector confluent-cloud --type source \ --name avro_registry_source \ --description "An example connection with schema registry validation" \ --field value=string \ --prop format="avro-confluent" \ --prop topic="my-topic" \ --prop cluster.api.endpoint="https://example.us-west-2.aws.confluent.cloud:443" \ --prop cluster.api.key="ABCDERAAFNJDHZL" \ --prop cluster.api.secret="605079f4" \ --prop cluster.id="lkc-54321" \ --prop confluent-registry.url="https://example-registry.us-east-2.aws.confluent.cloud" \ --prop confluent-registry.api-key="ABCDJ6B2WQMHXL7K" \ --prop confluent-registry.api-secret="5b17d53f" Connector starting state and offsets A new sink connection will start reading from the Latest point in the source Decodable stream. This means that only data that’s written to the stream when the connection has started will be sent to the external system. You can override this when you start the connection to Earliest if you want to send all the existing data on the source stream to the target system, along with all new data that arrives on the stream. When you restart a sink connection it will continue to read data from the point it most recently stored in the checkpoint before the connection stopped. You can also opt to discard the connection’s state and restart it afresh from Earliest or Latest as described above. Learn more about starting state here. Field Inclusion Policy If your input streams contain either a primary key or a partition key, the connection will write those key fields to your topics as the message key. value.fields-include determines whether the primary or partition key fields should be included in the message value as well. When value.fields-include = ALL, the primary or partition key fields will be written to both the message key and message value. When value.fields-include = EXCEPT_KEY, the primary or partition key fields will only be included in the message key. EXCEPT_KEY cannot be set when a primary or partition key does not exist in your input streams.