Confluent Cloud source connector

Features

Connector name

confluent-cloud

Delivery guarantee

Exactly once

Supported task sizes

S, M, L

Multiplex capability

A single instance of this connector can read from a single topic.

Supported stream types

Append stream

Configuration properties

Property Description Required Default

Connection

cluster.id

The ID of your Confluent cluster.

Yes

cluster.api.endpoint

The API endpoint for your Confluent cluster.

Yes

cluster.api.key

The API key for your Confluent cluster.

Yes

cluster.api.secret

The API secret for your Confluent cluster

This must be provided as a secret resource.

Yes

Data

topic

The name of the topic from which to read data.

Yes

key.format

The format for the key, if present, in the message.

Must be one of:

See Serialization formats for more details.

key.fields

A list of the key’s field names, delimited by semicolons, that comprise the message key.

For nested fields specify the root key only, and the nested fields as part of the connection’s schema.

See Message key for more details and an example of using message keys.

value.format

The format for the value payload in the message.

Must be one of:

  • avro

  • avro-confluent (see below for further properties)

  • json

  • debezium-json

  • debezium-avro-confluent

  • raw

See Serialization formats for more details.

value.fields-include

Which fields to include in the payload.

  • ALL: all fields are written, including the fields from the key

  • EXCEPT_KEY: all fields from the value of the message are written, but not those from the key

ALL

Format: Avro

confluent-registry.url

The URL of the schema registry to use.

Yes

confluent-registry.api-key

The API key for the Schema Registry

Yes

confluent-registry.api-secret

The API secret for the Schema Registry

This must be provided as a secret resource.

Yes

Advanced

parse-error-policy

Select the error handling policy. Must be one of the following:

  • FAIL: When set to FAIL, Decodable stops the connection if any validation errors are encountered in the incoming data stream.

  • IGNORE: When set to IGNORE, Decodable ignores all invalid records. All validated records are sent. With this policy, the connection isn’t stopped nor will any errors or warnings be shown if there is an invalid record.

scan.startup.mode

Specifies where to start reading data when the connection is first started, or when it’s restarted with the state discarded.

Must be one of the following:

  • group-offsets: Start reading data from a specified group consumer.

  • earliest-offset: Start reading data from the earliest available point in the stream.

  • latest-offset: Start reading data from the latest available point in the stream.

  • timestamp: Start reading data from a specific timestamp.

  • specific-offset: Start reading data from a specific offset value.

latest-offset

properties.group.id

The group ID used by the consumer.

Only required if scan.startup.mode is set to group-offsets.

properties.auto.offset.reset

What to do when there is no initial offset in Confluent Cloud (like a new consumer group) or if the specified offset no longer exists on the server (because of data deletion).

Serialization formats

The following formats are supported:

Format Description

json

JSON data

raw

Raw (byte based) values as a single column.

avro

Plain Avro data using the schema defined when you create the connection

avro-confluent

Avro data using a predefined schema store in a schema registry.

debezium-json

A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option or debezium-avro-confluent if you want to send Change Data Capture (CDC) data through this connector.

debezium-avro-confluent

A unified format schema for changelogs with additional support for serializing messages using Avro with the schema held in a schema registry. Select this option or debezium-json if you want to send Change Data Capture (CDC) data through this connector.

Avro

A popular choice for data serialization on Confluent Cloud is Apache Avro. Decodable supports both plain Avro, as well as Avro with a schema registry.

In order to use plain Avro (without the schema held in a Schema Registry) the Avro schema is generated from the connection schema, and is available on the inferred_schema connection property. For example:

$ decodable connection get 2428ebe9
confluent_cloud_source
  id                       2428ebe9
  description              -
  connector                confluent-cloud
  type                     source
  stream id                0dc6bcbc
  fields
    0  request_id          STRING
    1  user_id             INT
    2  ts                  BIGINT
    3  endpoint            STRING
  primary key fields       -
  properties
    cluster.api.endpoint   https://example.us-west-2.aws.confluent.cloud:443
    cluster.api.key        NF2GERAAFNABCDEF
    cluster.api.secret     d5d7a670
    cluster.id             lkc-98765
    format                 avro
    inferred_schema        {"type":"record","name":"record","fields":[{"name":"request_id","type":["null","string"],"default":null},{"name":"user_id","type":["null","int"],"default":null},{"name":"ts","type":["null","long"],"default":null},{"name":"endpoint","type":["null","string"],"default":null}]}
    topic                  clickstream_topic
[…]

Avro with Schema Registry

In this mode, the Avro schema is still derived from the connection schema, but it’s validated and registered against the configured schema registry.

To use Avro with a Schema Registry such as Confluent’s, additional connection properties are required. These can be specified when you create the connection, if using the CLI or API. If you create the connection through the UI you will need to use the CLI/API to update the connection to add the necessary properties.

For example, to create a source Confluent Cloud connection reading Avro messages using a secured schema registry using the CLI:

$ decodable connection create --connector confluent-cloud --type source \
    --name avro_registry_source \
    --description "An example connection with schema registry validation" \
    --field value=string \
    --prop format="avro-confluent" \
    --prop topic="my-topic" \
    --prop cluster.api.endpoint="https://example.us-west-2.aws.confluent.cloud:443" \
    --prop cluster.api.key="ABCDERAAFNJDHZL" \
    --prop cluster.api.secret="605079f4" \
    --prop cluster.id="lkc-54321" \
    --prop confluent-registry.url="https://example-registry.us-east-2.aws.confluent.cloud" \
    --prop confluent-registry.api-key="ABCDJ6B2WQMHXL7K" \
    --prop confluent-registry.api-secret="5b17d53f"

Connector starting state and offsets

When you create a connection, or restart it and discard state, it will read from the position in the topic as defined by scan startup mode. By default this is latest-offset and will therefore read from the end of the topic.

Learn more about starting state here.

How to connect to your Confluent Cloud cluster

  • The cluster id and API endpoint can be found on the Cluster overview -> Cluster settings page, in the Identification box.

This image shows the Cluster settings page under the Cluster overview tab. On the Cluster settings page
  • API keys and secrets can be created in Confluent Cloud by clicking Data Integration -> API keys. Here you can see any API keys you’ve already created or create a new one with the "Add key" button. Make sure you save the API key and secret pair somewhere you can find it since you won’t be able to get the secret again through the UI

This image shows the API keys page under the Data integration tab. On the API keys page
  • Click "Next"

Choose a topic

  • Now select the topic you want to connect

  • If you’re using a newly created API key, you may not be seeing any topics yet because it takes a minute or two for Confluent to propagate the new key through the system. Try waiting a bit and clicking "Refresh Topics"

Select a stream to connect to

  • Either select the existing Decodable stream this connection will be connected to or click "New Stream" to create a new stream for the connection

  • If you’re connecting to an existing stream, you can verify the schema and rename fields, but not add or remove any fields

  • If you’re creating a new stream you have three options for defining your schema

    1. Manually define a new schema by adding fields and giving them types

    2. Select "Structured Schema Definition" from the drop down to upload a schema file that you already have. Select the appropriate type of schema from the "Schema Type" dropdown, then type or paste the schema definition into the "Schema" text box.

    3. Select "Schema Registry" to connect to your Confluent Cloud schema registry and get the schema from there

  • You can get the required information by clicking "Schema Registry" in the Confluent Cloud sidebar.

This image highlights the Schema Registry option in the sidebar. height=200
  • Copy the API endpoint from the API endpoint box

  • API key/secret pairs can be created in the API credentials box. Just like for the cluster, make sure to save these because you won’t be able to access the secrets from the UI again.

This image shows that an API key and secret was generated. The values of the key and secret are shown once after creation.

Message key

To read the data held in the message key you need to do the following:

  1. Specify the format of the data in the key with key.format.

  2. Specify the name of the fields in the key with key.fields.

  3. Set value.fields-include=EXCEPT_KEY.

    1. If you are using the Decodable web interface to create the connection, you will need to use the CLI (as shown below) or API after creating the connection to update it with this property.

  4. Add the key fields and their data types to the connection schema.

    Do not set these as primary key in the schema.

Example

This example is based on a Confluent Cloud topic in which the key looks like this:

{
    "tenant": {
        "id": "acme-corp",
        "region": "us-west-2"
    }
}

and the value like this:

{
    "product_id": 1,
    "order_ts": 1534772501276,
    "total_amount": 10.50
}

CLI

To create a connection directly based on the above message key and value with the CLI use:

decodable connection create                        \
    --name orders                                  \
    --type source                                  \
    --connector confluent-cloud                              \
    --prop bootstrap.servers=broker:port           \
    --prop value.format=json                       \
    --prop key.fields=tenant                       \
    --prop key.format=json                         \
    --prop parse-error-policy=FAIL                 \
    --prop properties.auto.offset.reset=none       \
    --prop scan.startup.mode=earliest-offset       \
    --prop topic=orders                            \
    --prop value.fields-include=EXCEPT_KEY         \
    --field tenant="ROW<id STRING, region STRING>" \
    --field product_id="INT"                       \
    --field order_ts="BIGINT"                      \
    --field total_amount="FLOAT"

Web app

To configure a connection in the Decodable web app do the following:

  1. Create a connection to your Confluent Cloud broker including the following settings:

    Key Format

    JSON

    Key Fields

    tenant

    Value Format

    JSON

  2. Define the connection schema. This must include the field from the key (tenant).

    Do not mark it as a primary key field.

    Screenshot of the schema configuration
  3. Save the connection, but don’t activate it yet. Make a note of your connection id.

    Screenshot of connector screen showing its id
  4. Using the Decodable CLI update your connection (using the id from the previous step) to add the value.fields-include property

    decodable connection update 553feb9e --prop value.fields-include=EXCEPT_KEY
  5. Activate your connection as normal

  6. The data from the Confluent Cloud message key will be included in the stream:

    Screenshot of stream preview