Confluent Cloud sink connector Features Connector name confluent-cloud Delivery guarantee At least once Supported task sizes S, M, L Multiplex capability A single instance of this connector can write to multiple topics. Supported stream types Append stream Change stream Although the Confluent Cloud connector supports both types of stream, a single Confluent Cloud connection may only use one type of stream. If you want to send data from both change streams and append streams, then you must create separate Confluent Cloud connections. Configuration properties Property Description Required Default Connection cluster.id The ID of your Confluent cluster. Yes cluster.api.endpoint The API endpoint for your Confluent cluster. Yes cluster.api.key The API key for your Confluent cluster. Yes cluster.api.secret The API secret for your Confluent cluster This must be provided as a secret resource. Yes Data key.format The format for the key, if present, in the message. Must be one of: avro avro-confluent (see below for further properties) json debezium-json debezium-avro-confluent (see below for further properties) See Serialization formats for more details. Only required when the input streams contain either a primary key or a partition key. value.format The format for the value payload in the message. Must be one of: avro avro-confluent (see below for further properties) json debezium-json debezium-avro-confluent (see below for further properties) See Serialization formats for more details. — value.fields-include Where primary/partition key fields should appear in the output messages. ALL: the message key and value EXCEPT_KEY: the message key only See the Field Inclusion Policy section for more details on setting this property. — ALL Format: Avro confluent-registry.url The URL of the schema registry to use. Yes confluent-registry.api-key The API key for the Schema Registry Yes confluent-registry.api-secret The API secret for the Schema Registry This must be provided as a secret resource. Yes Topic names By default, Decodable uses the stream name as the name of the topic it writes to. If a topic already exists with that name, Decodable will write to the existing topic. If it doesn’t exist, Decodable will create it. You can change the name of the topic to which Decodable writes either in the web interface, or by using output-resource-name-template when calling decodable connection scan. Writing data to multiple topics A single instance of this connector can write to multiple topics. If you are using the CLI to create or edit a connection with this connector, you must use the declarative approach. You can generate the connection definition for the resources that you want to write to using decodable connection scan. Resource specifier keys When using the decodable connection scan command of the Decodable CLI to create a connection specification, the following resource specifier keys are available: Name Description topic The topic name Serialization formats The following formats are supported: Format Description json JSON data. avro Plain Avro data using the schemas of the Streams attached to the connection. avro-confluent Avro data using a predefined schema stored in a schema registry. debezium-json A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option or debezium-avro-confluent if you want to send Change Data Capture (CDC) data through this connector. debezium-avro-confluent A unified format schema for changelogs with additional support for serializing messages using Avro with the schema held in a schema registry. Select this option or debezium-json if you want to send Change Data Capture (CDC) data through this connector. Avro and Debezium-Avro with Schema Registry In this mode, the Avro schemas are derived from the schemas of the Streams attached to the connection, but they are validated and registered against the configured schema registry. To use Avro or Debezium-Avro with a Schema Registry such as Confluent’s, additional connection properties are required. Here is an example resource YAML that describes a Confluent Cloud connection with the avro-confluent format that you can create using the Decodable CLI: --- kind: connection metadata: name: cc-avro-confluent-sink description: Confluent Cloud sink connection with avro-confluent format spec_version: v2 spec: connector: confluent-cloud type: sink stream_mappings: - stream_name: avro_out_1 external_resource_specifier: topic: sink_topic_1 - stream_name: avro_out_2 external_resource_specifier: topic: sink_topic_2 properties: value.format: avro-confluent cluster.api.endpoint: https://example.us-west-2.aws.confluent.cloud:443 cluster.api.key: ABCDERAAFNJDHZL cluster.api.secret: 605079f4 cluster.id: lkc-54321 confluent-registry.url: https://example-registry.us-east-2.aws.confluent.cloud confluent-registry.api-key: ABCDJ6B2WQMHXL7K confluent-registry.api-secret: 5b17d53f Connector starting state and offsets A new sink connection will start reading from the Latest point in the source Decodable streams. This means that only data that’s written to the stream when the connection has started will be sent to the external system. You can override this when you start the connection to Earliest if you want to send all the existing data on the source streams to the target system, along with all new data that arrives on the streams. When you restart a sink connection it will continue to read data from the point it most recently stored in the checkpoint before the connection stopped. You can also opt to discard the connection’s state and restart it afresh from Earliest or Latest as described above. Learn more about starting state here. How to connect to your Confluent Cloud cluster Cluster Details The cluster id and API endpoint can be found on the Cluster overview -> Cluster settings page, in the Identification box. API keys and secrets can be created in Confluent Cloud by clicking Data Integration -> API keys. Here you can see any API keys you’ve already created or create a new one with the "Add key" button. Make sure you save the API key and secret pair somewhere you can find it since you won’t be able to get the secret again through the UI Field Inclusion Policy If your input streams contain either a primary key or a partition key, the connection will write those key fields to your topics as the message key. value.fields-include determines whether the primary or partition key fields should be included in the message value as well. When value.fields-include = ALL, the primary or partition key fields will be written to both the message key and message value. When value.fields-include = EXCEPT_KEY, the primary or partition key fields will only be included in the message key. EXCEPT_KEY cannot be set when a primary or partition key does not exist in your input streams. Example For this example, assume an input Stream has three fields: key (the primary or partition key field) field1 field2 And, assume a record in the Stream is as follows: Field Name | Value ------------------------------- key | key_value field1 | field1_value field2 | field2_value When value.fields-include = ALL, the resulting message key and message value written to your topic will be: Message Key { "key": "key_value" } Message Value { "key": "key_value", "field1": "field1_value", "field2": "field2_value" } When value.fields-include = EXCEPT_KEY, the resulting message key and message value written to your topic will be: Message Key { "key": "key_value" } Message Value { "field1": "field1_value", "field2": "field2_value" }