Confluent Cloud sink connector

Features

Connector name

confluent-cloud

At least once

Supported task sizes

S, M, L

A single instance of this connector can write to multiple topics.

Supported stream types

Although the Confluent Cloud connector supports both types of stream, a single Confluent Cloud connection may only use one type of stream. If you want to send data from both change streams and append streams, then you must create separate Confluent Cloud connections.

Configuration properties

Property Description Required Default

Connection

cluster.id

The ID of your Confluent cluster.

Yes

cluster.api.endpoint

The API endpoint for your Confluent cluster.

Yes

cluster.api.key

The API key for your Confluent cluster.

Yes

cluster.api.secret

The API secret for your Confluent cluster

This must be provided as a secret resource.

Yes

Data

key.format

The format for the key, if present, in the message.

Must be one of:

See Serialization formats for more details.

Only required when the input streams contain either a primary key or a partition key.

value.format

The format for the value payload in the message.

Must be one of:

See Serialization formats for more details.

value.fields-include

Where primary/partition key fields should appear in the output messages.

  • ALL: the message key and value

  • EXCEPT_KEY: the message key only

See the Field Inclusion Policy section for more details on setting this property.

ALL

Format: Avro

confluent-registry.url

The URL of the schema registry to use.

Yes

confluent-registry.api-key

The API key for the Schema Registry

Yes

confluent-registry.api-secret

The API secret for the Schema Registry

This must be provided as a secret resource.

Yes

Topic names

By default, Decodable uses the stream name as the name of the topic it writes to. If a topic already exists with that name, Decodable will write to the existing topic. If it doesn’t exist, Decodable will create it.

You can change the name of the topic to which Decodable writes either in the web interface, or by using output-resource-name-template when calling decodable connection scan.

Writing data to multiple topics

A single instance of this connector can write to multiple topics.

If you are using the CLI to create or edit a connection with this connector, you must use the declarative approach. You can generate the connection definition for the resources that you want to write to using decodable connection scan.

Resource specifier keys

When using the decodable connection scan command of the Decodable CLI to create a connection specification, the following resource specifier keys are available:

Name Description

topic

The topic name

Serialization formats

The following formats are supported:

Format Description

json

JSON data.

avro

Plain Avro data using the schemas of the Streams attached to the connection.

avro-confluent

Avro data using a predefined schema stored in a schema registry.

debezium-json

A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option or debezium-avro-confluent if you want to send Change Data Capture (CDC) data through this connector.

debezium-avro-confluent

A unified format schema for changelogs with additional support for serializing messages using Avro with the schema held in a schema registry. Select this option or debezium-json if you want to send Change Data Capture (CDC) data through this connector.

Avro and Debezium-Avro with Schema Registry

In this mode, the Avro schemas are derived from the schemas of the Streams attached to the connection, but they are validated and registered against the configured schema registry.

To use Avro or Debezium-Avro with a Schema Registry such as Confluent’s, additional connection properties are required.

Here is an example resource YAML that describes a Confluent Cloud connection with the avro-confluent format that you can create using the Decodable CLI:

---
kind: connection
metadata:
  name: cc-avro-confluent-sink
  description: Confluent Cloud sink connection with avro-confluent format
spec_version: v2
spec:
  connector: confluent-cloud
  type: sink
  stream_mappings:
    - stream_name: avro_out_1
      external_resource_specifier:
        topic: sink_topic_1
    - stream_name: avro_out_2
      external_resource_specifier:
        topic: sink_topic_2
  properties:
    value.format: avro-confluent
    cluster.api.endpoint: https://example.us-west-2.aws.confluent.cloud:443
    cluster.api.key: ABCDERAAFNJDHZL
    cluster.api.secret: 605079f4
    cluster.id: lkc-54321
    confluent-registry.url: https://example-registry.us-east-2.aws.confluent.cloud
    confluent-registry.api-key: ABCDJ6B2WQMHXL7K
    confluent-registry.api-secret: 5b17d53f

Connector starting state and offsets

A new sink connection will start reading from the Latest point in the source Decodable streams. This means that only data that’s written to the stream when the connection has started will be sent to the external system. You can override this when you start the connection to Earliest if you want to send all the existing data on the source streams to the target system, along with all new data that arrives on the streams.

When you restart a sink connection it will continue to read data from the point it most recently stored in the checkpoint before the connection stopped. You can also opt to discard the connection’s state and restart it afresh from Earliest or Latest as described above.

Learn more about starting state here.

How to connect to your Confluent Cloud cluster

Cluster Details

  • The cluster id and API endpoint can be found on the Cluster overview -> Cluster settings page, in the Identification box.

This image shows the Cluster settings page under the Cluster overview tab. On the Cluster settings page
  • API keys and secrets can be created in Confluent Cloud by clicking Data Integration -> API keys. Here you can see any API keys you’ve already created or create a new one with the "Add key" button. Make sure you save the API key and secret pair somewhere you can find it since you won’t be able to get the secret again through the UI

This image shows the API keys page under the Data integration tab. On the API keys page

Field Inclusion Policy

If your input streams contain either a primary key or a partition key, the connection will write those key fields to your topics as the message key. value.fields-include determines whether the primary or partition key fields should be included in the message value as well.

When value.fields-include = ALL, the primary or partition key fields will be written to both the message key and message value.

When value.fields-include = EXCEPT_KEY, the primary or partition key fields will only be included in the message key.

EXCEPT_KEY cannot be set when a primary or partition key does not exist in your input streams.

Example

For this example, assume an input Stream has three fields:

  • key (the primary or partition key field)

  • field1

  • field2

And, assume a record in the Stream is as follows:

Field Name | Value
-------------------------------
key        | key_value
field1     | field1_value
field2     | field2_value

When value.fields-include = ALL, the resulting message key and message value written to your topic will be:

Message Key

{
    "key": "key_value"
}

Message Value

{
    "key": "key_value",
    "field1": "field1_value",
    "field2": "field2_value"
}

When value.fields-include = EXCEPT_KEY, the resulting message key and message value written to your topic will be:

Message Key

{
    "key": "key_value"
}

Message Value

{
    "field1": "field1_value",
    "field2": "field2_value"
}