Confluent Cloud sink connector

Features

Connector name

confluent-cloud

Delivery guarantee

At least once

Supported task sizes

S, M, L

Multiplex capability

A single instance of this connector can write to a single topic.

Supported stream types

Append stream

Configuration properties

Property Description Required Default

Connection

cluster.id

The ID of your Confluent cluster.

Yes

cluster.api.endpoint

The API endpoint for your Confluent cluster.

Yes

cluster.api.key

The API key for your Confluent cluster.

Yes

cluster.api.secret

The API secret for your Confluent cluster

This must be provided as a secret resource.

Yes

Data

topic

The name of the topic to write data to.

Yes

key.format

The format for the key, if present, in the message.

Must be one of:

See Serialization formats for more details.

Only required when the input streams contain either a primary key or a partition key.

key.fields

A list of the key’s field names, delimited by semicolons, that comprise the message key.

For nested fields specify the root key only, and the nested fields as part of the connection’s schema.

value.format

The format for the value payload in the message.

Must be one of:

  • avro

  • avro-confluent (see below for further properties)

  • json

  • debezium-json

  • debezium-avro-confluent

  • raw

See Serialization formats for more details.

value.fields-include

Where primary/partition key fields should appear in the output messages.

  • ALL: the message key and value

  • EXCEPT_KEY: the message key only

See the Field Inclusion Policy section for more details on setting this property.

ALL

Format: Avro

confluent-registry.url

The URL of the schema registry to use.

Yes

confluent-registry.api-key

The API key for the Schema Registry

Yes

confluent-registry.api-secret

The API secret for the Schema Registry

This must be provided as a secret resource.

Yes

Serialization formats

The following formats are supported:

Format Description

json

JSON data

raw

Raw (byte based) values as a single column.

avro

Plain Avro data using the schema defined when you create the connection

avro-confluent

Avro data using a predefined schema store in a schema registry.

debezium-json

A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option or debezium-avro-confluent if you want to send Change Data Capture (CDC) data through this connector.

debezium-avro-confluent

A unified format schema for changelogs with additional support for serializing messages using Avro with the schema held in a schema registry. Select this option or debezium-json if you want to send Change Data Capture (CDC) data through this connector.

Avro

A popular choice for data serialization on Confluent Cloud is Apache Avro. Decodable supports both plain Avro, as well as Avro with a schema registry.

In order to use plain Avro (without the schema held in a Schema Registry) the Avro schema is generated from the connection schema, and is available on the inferred_schema connection property. For example:

$ decodable connection get 2428ebe9
confluent_cloud_sink
  id                       2428ebe9
  description              -
  connector                confluent-cloud
  type                     sink
  stream id                0dc6bcbc
  fields
    0  request_id          STRING
    1  user_id             INT
    2  ts                  BIGINT
    3  endpoint            STRING
  primary key fields       -
  properties
    cluster.api.endpoint   https://example.us-west-2.aws.confluent.cloud:443
    cluster.api.key        NF2GERAAFNABCDEF
    cluster.api.secret     d5d7a670
    cluster.id             lkc-98765
    format                 avro
    inferred_schema        {"type":"record","name":"record","fields":[{"name":"request_id","type":["null","string"],"default":null},{"name":"user_id","type":["null","int"],"default":null},{"name":"ts","type":["null","long"],"default":null},{"name":"endpoint","type":["null","string"],"default":null}]}
    topic                  clickstream_topic
[…]

Avro with Schema Registry

In this mode, the Avro schema is still derived from the connection schema, but it’s validated and registered against the configured schema registry.

To use Avro with a Schema Registry such as Confluent’s, additional connection properties are required. These can be specified when you create the connection, if using the CLI or API. If you create the connection through the UI you will need to use the CLI/API to update the connection to add the necessary properties.

For example, to create a sink Confluent Cloud connection reading Avro messages using a secured schema registry using the CLI:

$ decodable connection create --connector confluent-cloud --type source \
    --name avro_registry_source \
    --description "An example connection with schema registry validation" \
    --field value=string \
    --prop format="avro-confluent" \
    --prop topic="my-topic" \
    --prop cluster.api.endpoint="https://example.us-west-2.aws.confluent.cloud:443" \
    --prop cluster.api.key="ABCDERAAFNJDHZL" \
    --prop cluster.api.secret="605079f4" \
    --prop cluster.id="lkc-54321" \
    --prop confluent-registry.url="https://example-registry.us-east-2.aws.confluent.cloud" \
    --prop confluent-registry.api-key="ABCDJ6B2WQMHXL7K" \
    --prop confluent-registry.api-secret="5b17d53f"

Connector starting state and offsets

A new sink connection will start reading from the Latest point in the source Decodable stream. This means that only data that’s written to the stream when the connection has started will be sent to the external system. You can override this when you start the connection to Earliest if you want to send all the existing data on the source stream to the target system, along with all new data that arrives on the stream.

When you restart a sink connection it will continue to read data from the point it most recently stored in the checkpoint before the connection stopped. You can also opt to discard the connection’s state and restart it afresh from Earliest or Latest as described above.

Learn more about starting state here.

Field Inclusion Policy

If your input streams contain either a primary key or a partition key, the connection will write those key fields to your topics as the message key. value.fields-include determines whether the primary or partition key fields should be included in the message value as well.

When value.fields-include = ALL, the primary or partition key fields will be written to both the message key and message value.

When value.fields-include = EXCEPT_KEY, the primary or partition key fields will only be included in the message key.

EXCEPT_KEY cannot be set when a primary or partition key does not exist in your input streams.