Confluent Cloud

Overview

Confluent Cloud is a resilient, scalable streaming data service based on Apache Kafka®, delivered as a fully managed service. Confluent’s elastic scalability and virtually infinite storage simplifies real-time data movement across public and private clouds. Confluent provides out-of-the-box connectors to the most popular data sources/sinks in the Kafka ecosystem, SQL-based stream processing, a governance suite for streaming data, and more—all fully managed in the same cloud UI.

  • Massive scale without the ops overhead
  • Build for hybrid and multi-cloud
  • Simplify planning with no-limits storage
  • Reliably scale mission-critical apps
  • Run with enterprise-grade security & compliance
  • Scripting and workflow automations enabled at scale

Getting Started

Connections come in two flavors: source and sink. Source connections read from an external system and write to a Decodable stream, while sink connections read from a stream and write to an external system. Confluent Cloud connectors can be used in either role, performing as either a source or a sink. The configuration is almost identical, except that source connectors allow for defining a policy for handling parse errors of the incoming data stream.

Configure As A Source

To create and configure a connector for Confluent Cloud, sign in to the Decodable Web Console, navigate to the Connections tab, click on New Connection, and follow the steps below. For examples of using the command line tools or scripting, see the How To guides.

  1. Select a connector type, either source or sink, which will determine whether data is being streamed into Decodable for processing by one or more pipelines, or streamed out to an external system.

  2. Specify the ID of your Confluent cluster.

  3. Provide a list of one or more Kafka brokers to use to bootstrap connections. Brokers are specified as hostname/port pair, separated by a colon (e.g., https://broker-hostname-a:443). Multiple brokers must be separated by commas with no spaces.

  4. Specify the username or API key to be used for authentication.

  5. Specify the password or API secret key to be used for authentication.

  6. Select a data format used to deserialize and serialize the keys and values, which can be one of the following:

    • JSON, the JSON format allows to read and write JSON data that is based on a JSON schema.
    • Raw, the Raw format allows to read and write raw (byte based) values as a single column.
    • Avro, the Apache Avro format allows to read and write Avro data based on an Avro schema.
    • Debezium (JSON), Debezium provides a unified format schema for changelog and supports serializing messages using JSON.
  7. Select a policy for handling record parse errors, which can be either FAIL or IGNORE. When set to FAIL, the connection will change from the Running state to the Stopped state if any validation errors are encountered in the incoming data stream. When set to IGNORE, all validated records will be written, while invalid records will be ignored without stopping the connection or causing any errors/warnings.


For more detailed information about creating and configuring an Apache Kafka cluster, see the Confluent Cloud Quick Start guide and related documentation.


Configure As A Sink

To create a Kafka connector for use as a sink, all the configuration steps above apply, with the exception of the parse error policy, which is not defined on the outbound end of a data stream.

Reference

Connector nameconfluent-cloud
Typesource, sink
Delivery guaranteeat least once or exactly once, based on configuration

Security

Confluent cloud connections always use SSL transport encryption, which encrypts all network traffic between Decodable and the brokers.

SSL really means TLS
The Kafka connector (and Apache Kafka) actually implements TLS which supersedes the SSL protocol. For historical reasons, Kafka calls this SSL rather than TLS even though the implementation is really TLS. We do the same in order to avoid confusion for experienced Kafka users. That said, for the security-minded audience, it's really TLS!

Properties

The following properties are supported by the Confluent Cloud connector.

PropertyDispositionDescription
cluster.api.endpointrequiredA comma-separated list of Kafka brokers to use to bootstrap connections. Each broker must be a colon-separated (:) hostname/port pair. The list must not include any spaces. Example: broker-a:9092,broker-b:9092. Kafka brokers must be able to receive connections from Decodable's network blocks.
topicrequiredA semicolon separated list of topic names to read data from when connection type is source or write data to when connection type is sink.
formatoptional; if not specified, value.format must be usedSpecifies a data format used to deserialize and serialize the value. Supported formats are ["raw", "json", "avro", "confluent-avro"].
value.formatoptional; if not specified, format must be usedAn alias for format.
key.formatoptionalSpecifies a data format used to deserialize and serialize the key. Supported formats are ["raw", "json", "avro", "confluent-avro"].
cluster.api.keyrequiredThe provided username or API key for authentication.
cluster.api.secretrequiredThe provided password or API secret key for authentication.

Avro

A popular choice for data serialization on Kafka is Apache Avro . Decodable supports both plain Avro, as well as Avro with a schema registry.

Plain Avro

In order to use plain Avro, the connection property format=avro is required. In this mode, the Avro schema is generated from the connection schema, and is available on the inferred_schema connection property. For example:

decodable connection get 69caa7e1
avro_test_source
  id                       69caa7e1
  description              A Kafka source connection with plain Avro schema
  connector                kafka
  type                     source
  stream id                865e8555
  schema
    0  field1              INT NOT NULL
    1  field2              DOUBLE NOT NULL
    2  field3              STRING NOT NULL
  properties
    bootstrap.servers      kafka-kafka-brokers.kafka.svc.cluster.local:9092
    format                 avro
    inferred_schema        {"type":"record","name":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"double"},{"name":"field3","type":"string"}]}
    topic                  avro_topic
  target state             RUNNING
  actual state             RUNNING
  create time              2022-02-04T19:05:19Z
  update time              2022-02-08T17:04:46Z

Avro with Schema Registry

To use Avro with a schema registry, the connection property format=confluent-avro is required. In this mode, the Avro schema is still derived from the connection schema, but it is validated and registered against the configured schema registry.

When using this mode, the following properties are used:

PropertyDispositionDescription
avro-confluent.schema-registry.urlrequiredThe URL of the schema registry to use.
avro-confluent.basic-auth.credentials-sourcerequiredThe source of the authentication credentials to use, if required. Allowed values are ["USER_INFO", "SASL_INHERIT"]. Typically USER_INFO is used most often when schema registry authentication is enabled.
avro-confluent.basic-auth.user-inforequiredThe authentication credentials to use. They must be specified in the form of username:password. Note that the credentials are kept secret, once set, and will not be visible in the connection properties output.

For example, you can create a source Kafka connection using both SASL Authentication, as well as a secured schema registry using:

decodable connection create \
--name avro_registry_source \
--description "A Kafka source connection with SASL auth and a schema registry" \
--type source \
--stream-id 295e2a7f \
--connector confluent-cloud \
--prop format="avro-confluent" \
--prop topic="my-topic" \
--prop bootstrap.servers="some.broker.cloud:9092" \
--prop security.protocol=SASL_SSL \
--prop sasl.username="my-api-key" \
--prop sasl.password="my-secret-key" \
--prop sasl.mechanism="PLAIN" \
--prop avro-confluent.schema-registry.url="https://my.schema.registry.cloud" \
--prop avro-confluent.basic-auth.user-info="my-registry-user:my-registry-password" \
--prop avro-confluent.basic-auth.credentials-source="USER_INFO"

Apache Kafka, Kafka®, Apache® and associated open source project names are either registered trademarks or trademarks of The Apache Software Foundation.

Did this page help you?