Confluent Cloud

Use the Confluent Cloud connector to send data to a Confluent Cloud cluster. If you are looking for information about how to create a connection to get data into Decodable from Confluent Cloud, see Confluent Cloud in the Connect to a data source chapter.

Confluent Cloud connections always use SSL transport encryption, which encrypts all network traffic between Decodable and the brokers.

Overview

Connector name

confluent-cloud

Type

source, sink

Delivery guarantee

at least once or exactly once, based on configuration

Steps

Follow these steps to send data from Decodable into Confluent Cloud.

  1. From the Connections page, select the Confluent Cloud connector and complete the following fields.

    UI Field Property Name in the Decodable CLI Description

    Connection Type

    N/A

    Select sink for Decodable to send data to the cluster provided.

    Cluster ID

    cluster.id

    Specify the ID of your Confluent cluster.

    Cluster API Endpoint

    cluster.api.endpoint

    The API endpoint for your Confluent cluster.

    Cluster API Key

    cluster.api.key

    The API key for your Confluent cluster.

    Cluster API Secret

    cluster.api.secret

    The secret associated with your provided API key. If you are using the Decodable CLI this is the ID of a secret resource in your account. Run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret.

    Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Roles, groups, and permissions for more information.

    Value Format

    value.format

    The format for data in the Confluent cluster. Must be one of the following.

    • JSON

    • Raw

    • Avro: If you are using the Decodable CLI to create or configure this connection, specify confluent-avro to use Avro with a schema registry. In this mode, Avro schema is derived from the connection schema, but also validated and registered against the configured schema registry. Otherwise, specify avro to generate the Avro schema from the connection schema. The generated schema will be shown in the inferred_schema connection property. For more information on the additional properties used by the Decodable CLI for Avro formats, see Avro.

    • Debezium (JSON): Select this option if you want to send Change Data Capture (CDC) data through this connector.

  2. Select the topic that you want Decodable to send data to. Select Next.

  3. Select the stream that you’d like to connect to this connector. This is the stream containing the data that you’d like to send to Confluent Cloud. Select Next.

  4. Define the connection’s schema. Decodable can auto-populate the connection’s schema using Confluent’s schema registry. In most cases, you’ll want to select Schema Registry to automatically populate the connection’s schema. However, if you would like to manually enter the schema, select New Schema or Structured Schema Definition.

    1. If you want to auto-populate the connection schema using Confluent’s schema registry, you’ll also need to provide the Schema Registry Endpoint, the Schema Registry API Key, and the Schema Registry API Secret to use to communicate with your schema registry.

  5. Select Next when you are finished providing defining the connection’s schema.

  6. Give the newly created connection a Name and Description and select Save.

Reference

Avro

A popular choice for data serialization on Apache Kafka is Apache Avro. Decodable supports both plain Avro, as well as Avro with a schema registry.

Plain Avro

In order to use plain Avro, the connection property format=avro is required. In this mode, the Avro schema is generated from the connection schema, and is available on the inferred_schema connection property. For example:

./decodable connection get 2428ebe9
confluent_cloud_sink
  id                       2428ebe9
  description              -
  connector                confluent-cloud
  type                     sink
  stream id                0dc6bcbc
  fields
    0  request_id          STRING
    1  user_id             INT
    2  ts                  BIGINT
    3  endpoint            STRING
  primary key fields       -
  properties
    cluster.api.endpoint   https://example.us-west-2.aws.confluent.cloud:443
    cluster.api.key        NF2GERAAFNABCDEF
    cluster.api.secret     d5d7a670
    cluster.id             lkc-98765
    format                 avro
    inferred_schema        {"type":"record","name":"record","fields":[{"name":"request_id","type":["null","string"],"default":null},{"name":"user_id","type":["null","int"],"default":null},{"name":"ts","type":["null","long"],"default":null},{"name":"endpoint","type":["null","string"],"default":null}]}
    topic                  clickstream_topic
  target state             RUNNING
  actual state             RUNNING
  requested tasks          1
  actual tasks             1
  requested task size      M
  actual task size         -
  create time              2023-02-27T20:06:09Z
  update time              2023-02-27T20:06:09Z
  last runtime error       -

Avro with schema registry

To use Avro with a schema registry, the connection property format=confluent-avro is required. In this mode, the Avro schema is still derived from the connection schema, but it is validated and registered against the configured schema registry.

When using this mode, the following properties are used:

Property Disposition Description

confluent-registry.url

required

The URL of the schema registry to use.

confluent-registry.api-key

required

The username or API key to use to use

confluent-registry.api-secret

required

The password associated with the username. This must be provided as a secret resource. If you are using the Decodable CLI, run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret.

Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Roles, groups, and permissions for more information.

For example, you can create a source Kafka connection using both SASL Authentication, as well as a secured schema registry using:

decodable connection create \
--name avro_registry_source \
--description "A Kafka source connection with SASL auth and a schema registry" \
--type source \
--stream-id 295e2a7f \
--connector confluent-cloud \
--prop format="avro-confluent" \
--prop topic="my-topic" \
--prop bootstrap.servers="some.broker.cloud:9092" \
--prop security.protocol=SASL_SSL \
--prop sasl.username="my-api-key" \
--prop sasl.password="my-secret-key" \
--prop sasl.mechanism="PLAIN" \
--prop confluent-registry.url="https://my.schema.registry.cloud" \
--prop confluent-registry.api-key="<api-key>" \
--prop confluent-registry.api-secret="<api-secret>"