Confluent Cloud source connector

You can use the Confluent Cloud connector to get data from or send data to a Confluent Cloud cluster. This topic covers how to configure the Confluent Cloud Connector to get data from Confluent Cloud into Decodable. If you are looking for information about how to get data from Decodable into Confluent Cloud, see Confluent Cloud in the Connect to a data destination chapter.

Features

Delivery guarantee

At least once or exactly once, based on configuration

Steps

If you want to use the Decodable CLI or API to create the connection, you can refer to the Property Name column for information about what the underlying property names are. The connector name is confluent-cloud.
  1. From the Connections page, select the Confluent Cloud connector and complete the following fields.

    UI Field Property Name Description

    Connection Type

    N/A

    Select source to use this connector to get data into Decodable.

    Cluster ID

    cluster.id

    Specify the ID of your Confluent cluster.

    Cluster API Endpoint

    cluster.api.endpoint

    The API endpoint for your Confluent cluster.

    Cluster API Key

    cluster.api.key

    The API key for your Confluent cluster.

    Cluster API Secret

    cluster.api.secret

    The secret associated with your provided API key. If you are using the Decodable CLI, this is the ID of a secret resource in your account. Run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret.

    Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Roles, groups, and permissions for more information.

    Topic

    topic

    Optional. The topic that you want to receive data from.

    Value Format

    value.format

    The format for data in the Confluent cluster. Must be one of the following.

    • JSON: Read and write JSON data based on a JSON schema.

    • RAW: Read and write raw (byte based) values as a single column.

    • AVRO: Read and write Avro data based on an Avro schema. If you are using the Decodable CLI to create or configure this connection, specify confluent-avro to use Avro with a schema registry. In this mode, the Avro schema that you define is validated and registered against the configured schema registry. Otherwise, specify avro to generate the Avro schema from the connection schema. The generated schema will be shown in the inferred_schema connection property. For more information on the additional properties used by the Decodable CLI for Avro formats, see Avro.

    • Debezium (JSON): A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option if you want to send Change Data Capture (CDC) data through this connector.

    Parse error policy

    parse-error-policy

    Select the error handling policy. Must be one of the following:

    • FAIL: When set to FAIL, Decodable stops the connection if any validation errors are encountered in the incoming data stream.

    • IGNORE: When set to IGNORE, Decodable ignores all invalid records. All validated records are sent. With this policy, the connection isn’t stopped nor will any errors or warnings be shown if there is an invalid record.

    Scan Startup Mode

    scan.startup.mode

    Specifies where to start reading data when the connection is first started, or when it’s restarted with the state discarded.

    Must be one of the following:

    • group-offsets: Start reading data from a specified group consumer.

    • earliest-offset: Start reading data from the earliest available point in the stream.

    • latest-offset (default): Start reading data from the latest available point in the stream.

    • timestamp: Start reading data from a specific timestamp.

    • specific-offset: Start reading data from a specific offset value.

    Consumer Group ID

    properties.group.id

    The group ID used by the consumer. Optional, unless the Scan Startup Mode is set to group-offsets.

    Consumer Offset Reset Strategy

    properties.auto.offset.reset

    Select the behavior that you want when there is no initial offset in Confluent (like a new consumer group) or if the specified offset doesn’t exist (such as data deletion).

  2. Enter the Confluent Cloud topic that you’d like to receive data from. Then, select Next.

  3. Select the stream that you’d like to connect to this connector. Then, select Next.

  4. Define the connection’s schema. Decodable can automatically populate the connection’s schema using Confluent’s schema registry. In most cases, you’ll want to select Schema Registry to automatically populate the connection’s schema. However, if you would like to manually enter the schema, select New Schema or Structured Schema Definition.

    1. If you want to automatically populate the connection schema using Confluent’s schema registry, you’ll also need to provide the Schema Registry Endpoint, the Schema Registry API Key, and the Schema Registry API Secret to use to communicate with your schema registry.

  5. Select Next when you are finished providing defining the connection’s schema.

  6. Give the newly created connection a Name and Description and select Save.

Serialization formats: Avro

A popular choice for data serialization on Apache Kafka is Apache Avro. Decodable supports both plain Avro, as well as Avro with a schema registry.

Plain Avro

In order to use plain Avro, schema
connection property format=avro is required. In this mode, the Avro schema is generated from the connection schema, and is available on the inferred_schema connection property. For example:

decodable connection get 57f8ffb6
avro_example_source
  id                       57f8ffb6
  description              An example source connection with plain Avro schema
  connector                confluent-cloud
  type                     source
  stream id                3843bb39
  fields
    0  user_id             INT
    1  ts                  BIGINT
    2  endpoint            STRING
  primary key fields       -
  properties
    cluster.api.endpoint   https://example.us-west-2.aws.confluent.cloud:443
    cluster.api.key        ABCDERAAFNJDHZL
    cluster.api.secret     605079f4
    cluster.id             lkc-12345
    confluent-registry.api-key ABCDE6B2WQJDHFUP
    confluent-registry.api-secret 5b17d53f
    confluent-registry.url https://example-registry.us-east-2.aws.confluent.cloud
    format    {"type":"record","name":"record","fields":[{"name":"user_id","type":["null","int"],"default":null},{"name":"ts","type":["null","long"],"default":null},{"name":"endpoint","type":["null","string"],"default":null}]}
    parse-error-policy     FAIL
    properties.auto.offset.reset none
    scan.startup.mode      earliest-offset
             avro
    inferred_schema            topic                  my_example_topic
  target state             RUNNING
  actual state             RUNNING
  requested tasks          1
  actual tasks             0
  requested task size      M
  actual task size         -
  create time              2023-02-27T19:42:43Z
  update time              2023-02-27T19:43:06Z

Avro with Schema Registry

To use Avro with a schema registry, the connection property format=confluent-avro is required. In this mode, the Avro schema is still derived from the connection schema, but it’s validated and registered against the configured schema registry.

When using this mode, the following properties are used.

Property Required? Description

confluent-registry.url

required

The URL of the schema registry to use.

confluent-registry.api-key

required

The username or API key to use to use.

confluent-registry.api-secret

required

The password associated with the username. This must be provided as a secret resource. If you are using the Decodable CLI, run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret.

Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Roles, groups, and permissions for more information.

The following command is an example of how to create a connection to Confluent Cloud using SASL Authentication and secured schema registry.

./decodable connection create --connector confluent-cloud --type source \
--name avro_registry_source \
--description "An example connection with schema registry validation" \
--field value=string \
--prop format="avro-confluent" \
--prop topic="my-topic" \
--prop cluster.api.endpoint="https://example.us-west-2.aws.confluent.cloud:443" \
--prop cluster.api.key="ABCDERAAFNJDHZL" \
--prop cluster.api.secret="605079f4" \
--prop cluster.id="lkc-54321" \
--prop confluent-registry.url="https://example-registry.us-east-2.aws.confluent.cloud" \
--prop confluent-registry.api-key="ABCDJ6B2WQMHXL7K" \
--prop confluent-registry.api-secret="5b17d53f"

Metadata fields

When you connect to Confluent Cloud, you have access to Apache Kafka metadata fields. To see a complete list of metadata fields provided and further instructions, see message metadata.

Connector starting state and offsets

When you create a connection, or restart it and discard state, it will read from the position in the topic as defined by scan startup mode. By default this is latest-offset and will therefore read from the end of the topic.

Learn more about starting state here.