Confluent Cloud
Use the Confluent Cloud connector to get data from or send data to a Confluent Cloud cluster.
Overview
Connector name | confluent-cloud |
Type | source , sink |
Delivery guarantee | at least once or exactly once, based on configuration |
Create a connection to Confluent Cloud with the Confluent Cloud connector
Follow these steps to send data from Decodable into Confluent Cloud.
- From the Connections page, select the Confluent Cloud connector and complete the following fields.
Field | Description |
---|---|
Connection Type | Select whether you want to connect to a Confluent Cloud as a source or a sink. If you select source, then Decodable will read data from the cluster provided. If you select sink, then Decodable will send data to the cluster provided. |
Cluster ID | Specify the ID of your Confluent cluster. |
Cluster API Endpoint | The API endpoint for your Confluent cluster. |
Cluster API Key | The API key for your Confluent cluster. |
Cluster API Secret | The secret associated with your provided API key. |
Value Format | The format for data in the Confluent cluster. Must be one of the following: - JSON - Raw - Avro - Debezium (JSON) If you want to send CDC data through this connector, then you must select Debezium (JSON). |
Parse error policy | Only applicable if you are connecting to Confluent Cloud as a source. Select the behavior that you want when Decodable detects a validation error in the incoming data. When set to FAIL , the connection will be stopped. When set to IGNORE , the connection continues to run and send records, however, invalid records are ignored. |
Consumer Group ID | Only applicable if you are connecting to Confluent Cloud as a source. The group ID used by the consumer. |
Scan Startup Mode | Only applicable if you are connecting to Confluent Cloud as a source. The startup mode for the Kafka consumer. Can be one of the following: - group offset: Start from committed offsets in ZK / Kafka brokers of a specific consumer group. - earliest offsets: Start from the earliest offset possible. - latest offset: Start from the latest offset. - timestamp: Start from user-supplied timestamp for each partition. - specific offsets: Start from user-supplied specific offsets for each partition. |
Consumer Offset Reset Strategy | Only applicable if you are connecting to Confluent Cloud as a source. Select the behavior that you want when there is no initial offset in Kafka (e.g. a new consumer group) or if the specified offset does not exist (e.g. that data has been deleted). |
-
Select the stream that you’d like to connect to this connector. Then, select Next.
-
Define the connection’s schema. Decodable can auto-populate the connection’s schema using Confluent’s schema registry. In most cases, you’ll want to select Schema Registry to automatically populate the connection’s schema. However, if you would like to manually enter the schema, select New Schema or Structured Schema Definition.
- If you want to auto-populate the connection schema using Confluent’s schema registry, you’ll also need to provide the REST endpoint for your schema registry, the API key for communicating with your schema registry, and the API secret for communicating with your schema registry.
-
Select Next when you are finished providing defining the connection’s schema.
-
Give the newly created connection a Name and Description and select Save.
Security
Confluent cloud connections always use SSL transport encryption, which encrypts all network traffic between Decodable and the brokers.
SSL really means TLS
The Kafka connector (and Apache Kafka) actually implements TLS which supersedes the SSL protocol. For historical reasons, Kafka calls this SSL rather than TLS even though the implementation is really TLS. We do the same in order to avoid confusion for experienced Kafka users. That said, for the security-minded audience, it's really TLS!
Properties
The following properties are supported by the Confluent Cloud connector.
Property | Disposition | Description |
---|---|---|
cluster.api.endpoint | required | A comma-separated list of Kafka brokers to use to bootstrap connections. Each broker must be a colon-separated (: ) hostname/port pair. The list must not include any spaces. Example: broker-a:9092,broker-b:9092 . Kafka brokers must be able to receive connections from Decodable's network blocks. |
topic | required | A semicolon separated list of topic names to read data from when connection type is source or write data to when connection type is sink. |
format | optional; if not specified, value.format must be used | Specifies a data format used to deserialize and serialize the value. Supported formats are ["raw", "json", "avro", "confluent-avro"]. |
value.format | optional; if not specified, format must be used | An alias for format . |
key.format | optional | Specifies a data format used to deserialize and serialize the key. Supported formats are ["raw", "json", "avro", "confluent-avro"]. |
cluster.api.key | required | The provided username or API key for authentication. |
cluster.api.secret | required | The provided password or API secret key for authentication. |
Avro
A popular choice for data serialization on Kafka is Apache Avro . Decodable supports both plain Avro, as well as Avro with a schema registry.
Plain Avro
In order to use plain Avro, the connection property format=avro
is required. In this mode, the Avro schema is generated from the connection schema, and is available on the inferred_schema
connection property. For example:
decodable connection get 69caa7e1
avro_test_source
id 69caa7e1
description A Kafka source connection with plain Avro schema
connector kafka
type source
stream id 865e8555
schema
0 field1 INT NOT NULL
1 field2 DOUBLE NOT NULL
2 field3 STRING NOT NULL
properties
bootstrap.servers kafka-kafka-brokers.kafka.svc.cluster.local:9092
format avro
inferred_schema {"type":"record","name":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"double"},{"name":"field3","type":"string"}]}
topic avro_topic
target state RUNNING
actual state RUNNING
create time 2022-02-04T19:05:19Z
update time 2022-02-08T17:04:46Z
Avro with Schema Registry
To use Avro with a schema registry, the connection property format=confluent-avro
is required. In this mode, the Avro schema is still derived from the connection schema, but it is validated and registered against the configured schema registry.
When using this mode, the following properties are used:
Property | Disposition | Description |
---|---|---|
confluent-registry.url | required | The URL of the schema registry to use. |
confluent-registry.api-key | required | The username or API key to use to use |
confluent-registry.api-secret | required | The password or API secret to use |
For example, you can create a source Kafka connection using both SASL Authentication, as well as a secured schema registry using:
decodable connection create \
--name avro_registry_source \
--description "A Kafka source connection with SASL auth and a schema registry" \
--type source \
--stream-id 295e2a7f \
--connector confluent-cloud \
--prop format="avro-confluent" \
--prop topic="my-topic" \
--prop bootstrap.servers="some.broker.cloud:9092" \
--prop security.protocol=SASL_SSL \
--prop sasl.username="my-api-key" \
--prop sasl.password="my-secret-key" \
--prop sasl.mechanism="PLAIN" \
--prop confluent-registry.url="https://my.schema.registry.cloud" \
--prop confluent-registry.api-key="<api-key>" \
--prop confluent-registry.api-secret="<api-secret>"
Metadata fields
Confluent Cloud Connections provide access to Kafka metadata fields. See Connector reference: Apache Kafka: Metadata fields for the complete list of metadata fields provided, and further instructions.
Updated 3 months ago