Confluent Cloud

Use the Confluent Cloud connector to get data from or send data to a Confluent Cloud cluster.

Overview

Connector nameconfluent-cloud
Typesource, sink
Delivery guaranteeat least once or exactly once, based on configuration

Create a connection to Confluent Cloud with the Confluent Cloud connector

Follow these steps to send data from Decodable into Confluent Cloud.

  1. From the Connections page, select the Confluent Cloud connector and complete the following fields.
FieldDescription
Connection Type Select whether you want to connect to a Confluent Cloud as a source or a sink.

If you select source, then Decodable will read data from the cluster provided.
If you select sink, then Decodable will send data to the cluster provided.
Cluster IDSpecify the ID of your Confluent cluster.
Cluster API EndpointThe API endpoint for your Confluent cluster.
Cluster API KeyThe API key for your Confluent cluster.
Cluster API SecretThe secret associated with your provided API key.
Value FormatThe format for data in the Confluent cluster. Must be one of the following:
- JSON
- Raw
- Avro
- Debezium (JSON)

If you want to send CDC data through this connector, then you must select Debezium (JSON).
Parse error policyOnly applicable if you are connecting to Confluent Cloud as a source.

Select the behavior that you want when Decodable detects a validation error in the incoming data. When set to FAIL, the connection will be stopped. When set to IGNORE, the connection continues to run and send records, however, invalid records are ignored.
Consumer Group IDOnly applicable if you are connecting to Confluent Cloud as a source.

The group ID used by the consumer.
Scan Startup ModeOnly applicable if you are connecting to Confluent Cloud as a source.

The startup mode for the Kafka consumer. Can be one of the following:
- group offset: Start from committed offsets in ZK / Kafka brokers of a specific consumer group.
- earliest offsets: Start from the earliest offset possible.
- latest offset: Start from the latest offset.
- timestamp: Start from user-supplied timestamp for each partition.
- specific offsets: Start from user-supplied specific offsets for each partition.
Consumer Offset Reset Strategy Only applicable if you are connecting to Confluent Cloud as a source.

Select the behavior that you want when there is no initial offset in Kafka (e.g. a new consumer group) or if the specified offset does not exist (e.g. that data has been deleted).
  1. Select the stream that you’d like to connect to this connector. Then, select Next.

  2. Define the connection’s schema. Decodable can auto-populate the connection’s schema using Confluent’s schema registry. In most cases, you’ll want to select Schema Registry to automatically populate the connection’s schema. However, if you would like to manually enter the schema, select New Schema or Structured Schema Definition.

    1. If you want to auto-populate the connection schema using Confluent’s schema registry, you’ll also need to provide the REST endpoint for your schema registry, the API key for communicating with your schema registry, and the API secret for communicating with your schema registry.
  3. Select Next when you are finished providing defining the connection’s schema.

  4. Give the newly created connection a Name and Description and select Save.

Security

Confluent cloud connections always use SSL transport encryption, which encrypts all network traffic between Decodable and the brokers.

SSL really means TLS
The Kafka connector (and Apache Kafka) actually implements TLS which supersedes the SSL protocol. For historical reasons, Kafka calls this SSL rather than TLS even though the implementation is really TLS. We do the same in order to avoid confusion for experienced Kafka users. That said, for the security-minded audience, it's really TLS!

Properties

The following properties are supported by the Confluent Cloud connector.

PropertyDispositionDescription
cluster.api.endpointrequiredA comma-separated list of Kafka brokers to use to bootstrap connections. Each broker must be a colon-separated (:) hostname/port pair. The list must not include any spaces. Example: broker-a:9092,broker-b:9092. Kafka brokers must be able to receive connections from Decodable's network blocks.
topicrequiredA semicolon separated list of topic names to read data from when connection type is source or write data to when connection type is sink.
formatoptional; if not specified, value.format must be usedSpecifies a data format used to deserialize and serialize the value. Supported formats are ["raw", "json", "avro", "confluent-avro"].
value.formatoptional; if not specified, format must be usedAn alias for format.
key.formatoptionalSpecifies a data format used to deserialize and serialize the key. Supported formats are ["raw", "json", "avro", "confluent-avro"].
cluster.api.keyrequiredThe provided username or API key for authentication.
cluster.api.secretrequiredThe provided password or API secret key for authentication.

Avro

A popular choice for data serialization on Kafka is Apache Avro . Decodable supports both plain Avro, as well as Avro with a schema registry.

Plain Avro

In order to use plain Avro, the connection property format=avro is required. In this mode, the Avro schema is generated from the connection schema, and is available on the inferred_schema connection property. For example:

decodable connection get 69caa7e1
avro_test_source
  id                       69caa7e1
  description              A Kafka source connection with plain Avro schema
  connector                kafka
  type                     source
  stream id                865e8555
  schema
    0  field1              INT NOT NULL
    1  field2              DOUBLE NOT NULL
    2  field3              STRING NOT NULL
  properties
    bootstrap.servers      kafka-kafka-brokers.kafka.svc.cluster.local:9092
    format                 avro
    inferred_schema        {"type":"record","name":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"double"},{"name":"field3","type":"string"}]}
    topic                  avro_topic
  target state             RUNNING
  actual state             RUNNING
  create time              2022-02-04T19:05:19Z
  update time              2022-02-08T17:04:46Z

Avro with Schema Registry

To use Avro with a schema registry, the connection property format=confluent-avro is required. In this mode, the Avro schema is still derived from the connection schema, but it is validated and registered against the configured schema registry.

When using this mode, the following properties are used:

PropertyDispositionDescription
confluent-registry.urlrequiredThe URL of the schema registry to use.
confluent-registry.api-keyrequiredThe username or API key to use to use
confluent-registry.api-secretrequiredThe password or API secret to use

For example, you can create a source Kafka connection using both SASL Authentication, as well as a secured schema registry using:

decodable connection create \
--name avro_registry_source \
--description "A Kafka source connection with SASL auth and a schema registry" \
--type source \
--stream-id 295e2a7f \
--connector confluent-cloud \
--prop format="avro-confluent" \
--prop topic="my-topic" \
--prop bootstrap.servers="some.broker.cloud:9092" \
--prop security.protocol=SASL_SSL \
--prop sasl.username="my-api-key" \
--prop sasl.password="my-secret-key" \
--prop sasl.mechanism="PLAIN" \
--prop confluent-registry.url="https://my.schema.registry.cloud" \
--prop confluent-registry.api-key="<api-key>" \
--prop confluent-registry.api-secret="<api-secret>"

Metadata fields

Confluent Cloud Connections provide access to Kafka metadata fields. See Connector reference: Apache Kafka: Metadata fields for the complete list of metadata fields provided, and further instructions.


Apache Kafka, Kafka®, Apache® and associated open source project names are either registered trademarks or trademarks of The Apache Software Foundation.