Confluent Cloud source connector

Features

Connector name

confluent-cloud

Exactly once

Supported task sizes

S, M, L

A single instance of this connector can read from multiple topics.

Supported stream types

Although the Confluent Cloud connector supports both types of stream, a single Confluent Cloud connection may only use one type of stream. If you want to send data from both change streams and append streams, then you must create separate Confluent Cloud connections.

Configuration properties

Property Description Required Default

Connection

cluster.id

The ID of your Confluent cluster.

Yes

cluster.api.endpoint

The API endpoint for your Confluent cluster.

Yes

cluster.api.key

The API key for your Confluent cluster.

Yes

cluster.api.secret

The API secret for your Confluent cluster

This must be provided as a secret resource.

Yes

Data

key.format

The format for the key, if present, in the message.

Must be one of:

See Serialization formats for more details.

Only required when value.fields-include = EXCEPT_KEY. See the Field Inclusion Policy section for more details.

value.format

The format for the value payload in the message.

Must be one of:

See Serialization formats for more details.

value.fields-include

Where data written to the Decodable payload should be read from.

  • ALL: the message value only

  • EXCEPT_KEY: the message key and value

See the Field Inclusion Policy section for more details on setting this property.

ALL

Format: Avro

confluent-registry.url

The URL of the schema registry to use.

Yes

confluent-registry.api-key

The API key for the Schema Registry

Yes

confluent-registry.api-secret

The API secret for the Schema Registry

This must be provided as a secret resource.

Yes

Advanced

parse-error-policy

Select the error handling policy. Must be one of the following:

  • FAIL: When set to FAIL, Decodable stops the connection if any validation errors are encountered in the incoming data stream.

  • IGNORE: When set to IGNORE, Decodable ignores all invalid records. All validated records are sent. With this policy, the connection isn’t stopped nor will any errors or warnings be shown if there is an invalid record.

scan.startup.mode

Specifies where to start reading data when the connection is first started, or when it’s restarted with the state discarded.

Must be one of the following:

  • group-offsets: Start reading data from a specified group consumer.

  • earliest-offset: Start reading data from the earliest available point in the stream.

  • latest-offset: Start reading data from the latest available point in the stream.

  • timestamp: Start reading data from a specific timestamp.

latest-offset

properties.group.id

The group ID used by the consumer.

Only required if scan.startup.mode is set to group-offsets.

scan.startup.timestamp-millis

The timestamp from which the consumer will start scanning, in milliseconds since January 1, 1970 00:00:00.000 GMT.

Only required if scan.startup.mode is set to timestamp.

properties.auto.offset.reset

What to do when there is no initial offset in Confluent Cloud (like a new consumer group) or if the specified offset no longer exists on the server (because of data deletion).

Ingesting data from multiple topics

A single instance of this connector can read from multiple topics.

If you are using the CLI to create or edit a connection with this connector, you must use the declarative approach. You can generate the connection definition for the resources that you want to ingest using decodable connection scan.

Resource specifier keys

The following resource specifier keys are available:

Name Description

topic

The topic name

Serialization formats

The following formats are supported:

Format Description

json

JSON data.

avro

Plain Avro data using the schemas of the Streams attached to the connection.

avro-confluent

Avro data using a predefined schema stored in a schema registry.

debezium-json

A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option or debezium-avro-confluent if you want to send Change Data Capture (CDC) data through this connector.

debezium-avro-confluent

A unified format schema for changelogs with additional support for serializing messages using Avro with the schema held in a schema registry. Select this option or debezium-json if you want to send Change Data Capture (CDC) data through this connector.

Avro and Debezium-Avro with Schema Registry

In this mode, the Avro schemas are derived from the schemas of the Streams attached to the connection, but they are validated and registered against the configured schema registry.

To use Avro or Debezium-Avro with a Schema Registry such as Confluent’s, additional connection properties are required.

Here is an example resource YAML that describes a Confluent Cloud connection with the avro-confluent format that you can create using the Decodable CLI:

---
kind: connection
metadata:
  name: cc-avro-confluent-source
  description: Confluent Cloud source connection with avro-confluent format
spec_version: v2
spec:
  connector: confluent-cloud
  type: source
  stream_mappings:
    - stream_name: avro_in_1
      external_resource_specifier:
        topic: source_topic_1
    - stream_name: avro_in_2
      external_resource_specifier:
        topic: source_topic_2
  properties:
    value.format: avro-confluent
    cluster.api.endpoint: https://example.us-west-2.aws.confluent.cloud:443
    cluster.api.key: ABCDERAAFNJDHZL
    cluster.api.secret: 605079f4
    cluster.id: lkc-54321
    confluent-registry.url: https://example-registry.us-east-2.aws.confluent.cloud
    confluent-registry.api-key: ABCDJ6B2WQMHXL7K
    confluent-registry.api-secret: 5b17d53f

Connector starting state and offsets

When you create a connection, or restart it and discard state, it will read from the position in the topic as defined by scan startup mode. By default this is latest-offset and will therefore read from the end of the topic.

Learn more about starting state here.

How to connect to your Confluent Cloud cluster

Cluster Details

  • The cluster id and API endpoint can be found on the Cluster overview -> Cluster settings page, in the Identification box.

This image shows the Cluster settings page under the Cluster overview tab. On the Cluster settings page
  • API keys and secrets can be created in Confluent Cloud by clicking Data Integration -> API keys. Here you can see any API keys you’ve already created or create a new one with the "Add key" button. Make sure you save the API key and secret pair somewhere you can find it since you won’t be able to get the secret again through the UI

This image shows the API keys page under the Data integration tab. On the API keys page

Field Inclusion Policy

Depending on the structure of your input messages, you may want to read the message key. The value.fields-include property determines whether an input message’s key is projected into the Decodable output payload; when applicable, the key.format property describes the format of the message key.

When value.fields-include = ALL, the message key is ignored. All output fields are sourced from the message value, including primary or partition key fields when applicable.

When value.fields-include = EXCEPT_KEY, the message key is deserialized and its fields are projected into the output record. The structure of a message key must match the primary or partition key fields defined on the receiving stream. The rest of the output record’s fields are sourced from the message value.

key.format is required when value.fields-include = EXCEPT_KEY.
EXCEPT_KEY cannot be set when a primary or partition key does not exist in your output streams.

Example

For this example, assume the following message key and message value pair is read from your topic:

Message Key

{
    "key": "from_message_key"
}

Message Value

{
    "key": "from_message_value",
    "field1": "field1_value",
    "field2": "field2_value"
}

Assume the output Stream has three fields:

  • key (the primary or partition key field)

  • field1

  • field2

When value.fields-include = ALL, the resulting Decodable payload will be:

Field Name | Value
-------------------------------
key        | from_message_value
field1     | field1_value
field2     | field2_value

When value.fields-include = EXCEPT_KEY, the resulting Decodable payload will be:

Field Name | Value
-------------------------------
key        | from_message_key
field1     | field1_value
field2     | field2_value