Confluent Cloud source connector Features Connector name confluent-cloud Delivery guarantee Exactly once Supported task sizes S, M, L Multiplex capability A single instance of this connector can read from multiple topics. Supported stream types Append stream Change stream Although the Confluent Cloud connector supports both types of stream, a single Confluent Cloud connection may only use one type of stream. If you want to send data from both change streams and append streams, then you must create separate Confluent Cloud connections. Configuration properties Property Description Required Default Connection cluster.id The ID of your Confluent cluster. Yes cluster.api.endpoint The API endpoint for your Confluent cluster. Yes cluster.api.key The API key for your Confluent cluster. Yes cluster.api.secret The API secret for your Confluent cluster This must be provided as a secret resource. Yes Data key.format The format for the key, if present, in the message. Must be one of: avro avro-confluent (see below for further properties) json debezium-json debezium-avro-confluent (see below for further properties) See Serialization formats for more details. Only required when value.fields-include = EXCEPT_KEY. See the Field Inclusion Policy section for more details. value.format The format for the value payload in the message. Must be one of: avro avro-confluent (see below for further properties) json debezium-json debezium-avro-confluent (see below for further properties) See Serialization formats for more details. — value.fields-include Where data written to the Decodable payload should be read from. ALL: the message value only EXCEPT_KEY: the message key and value See the Field Inclusion Policy section for more details on setting this property. — ALL Format: Avro confluent-registry.url The URL of the schema registry to use. Yes confluent-registry.api-key The API key for the Schema Registry Yes confluent-registry.api-secret The API secret for the Schema Registry This must be provided as a secret resource. Yes Advanced parse-error-policy Select the error handling policy. Must be one of the following: FAIL: When set to FAIL, Decodable stops the connection if any validation errors are encountered in the incoming data stream. IGNORE: When set to IGNORE, Decodable ignores all invalid records. All validated records are sent. With this policy, the connection isn’t stopped nor will any errors or warnings be shown if there is an invalid record. — scan.startup.mode Specifies where to start reading data when the connection is first started, or when it’s restarted with the state discarded. Must be one of the following: group-offsets: Start reading data from a specified group consumer. earliest-offset: Start reading data from the earliest available point in the stream. latest-offset: Start reading data from the latest available point in the stream. timestamp: Start reading data from a specific timestamp. — latest-offset properties.group.id The group ID used by the consumer. Only required if scan.startup.mode is set to group-offsets. scan.startup.timestamp-millis The timestamp from which the consumer will start scanning, in milliseconds since January 1, 1970 00:00:00.000 GMT. Only required if scan.startup.mode is set to timestamp. properties.auto.offset.reset What to do when there is no initial offset in Confluent Cloud (like a new consumer group) or if the specified offset no longer exists on the server (because of data deletion). — Ingesting data from multiple topics A single instance of this connector can read from multiple topics. If you are using the CLI to create or edit a connection with this connector, you must use the declarative approach. You can generate the connection definition for the resources that you want to ingest using decodable connection scan. Resource specifier keys The following resource specifier keys are available: Name Description topic The topic name Serialization formats The following formats are supported: Format Description json JSON data. avro Plain Avro data using the schemas of the Streams attached to the connection. avro-confluent Avro data using a predefined schema stored in a schema registry. debezium-json A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option or debezium-avro-confluent if you want to send Change Data Capture (CDC) data through this connector. debezium-avro-confluent A unified format schema for changelogs with additional support for serializing messages using Avro with the schema held in a schema registry. Select this option or debezium-json if you want to send Change Data Capture (CDC) data through this connector. Avro and Debezium-Avro with Schema Registry In this mode, the Avro schemas are derived from the schemas of the Streams attached to the connection, but they are validated and registered against the configured schema registry. To use Avro or Debezium-Avro with a Schema Registry such as Confluent’s, additional connection properties are required. Here is an example resource YAML that describes a Confluent Cloud connection with the avro-confluent format that you can create using the Decodable CLI: --- kind: connection metadata: name: cc-avro-confluent-source description: Confluent Cloud source connection with avro-confluent format spec_version: v2 spec: connector: confluent-cloud type: source stream_mappings: - stream_name: avro_in_1 external_resource_specifier: topic: source_topic_1 - stream_name: avro_in_2 external_resource_specifier: topic: source_topic_2 properties: value.format: avro-confluent cluster.api.endpoint: https://example.us-west-2.aws.confluent.cloud:443 cluster.api.key: ABCDERAAFNJDHZL cluster.api.secret: 605079f4 cluster.id: lkc-54321 confluent-registry.url: https://example-registry.us-east-2.aws.confluent.cloud confluent-registry.api-key: ABCDJ6B2WQMHXL7K confluent-registry.api-secret: 5b17d53f Connector starting state and offsets When you create a connection, or restart it and discard state, it will read from the position in the topic as defined by scan startup mode. By default this is latest-offset and will therefore read from the end of the topic. Learn more about starting state here. How to connect to your Confluent Cloud cluster Cluster Details The cluster id and API endpoint can be found on the Cluster overview -> Cluster settings page, in the Identification box. API keys and secrets can be created in Confluent Cloud by clicking Data Integration -> API keys. Here you can see any API keys you’ve already created or create a new one with the "Add key" button. Make sure you save the API key and secret pair somewhere you can find it since you won’t be able to get the secret again through the UI Field Inclusion Policy Depending on the structure of your input messages, you may want to read the message key. The value.fields-include property determines whether an input message’s key is projected into the Decodable output payload; when applicable, the key.format property describes the format of the message key. When value.fields-include = ALL, the message key is ignored. All output fields are sourced from the message value, including primary or partition key fields when applicable. When value.fields-include = EXCEPT_KEY, the message key is deserialized and its fields are projected into the output record. The structure of a message key must match the primary or partition key fields defined on the receiving stream. The rest of the output record’s fields are sourced from the message value. key.format is required when value.fields-include = EXCEPT_KEY. EXCEPT_KEY cannot be set when a primary or partition key does not exist in your output streams. Example For this example, assume the following message key and message value pair is read from your topic: Message Key { "key": "from_message_key" } Message Value { "key": "from_message_value", "field1": "field1_value", "field2": "field2_value" } Assume the output Stream has three fields: key (the primary or partition key field) field1 field2 When value.fields-include = ALL, the resulting Decodable payload will be: Field Name | Value ------------------------------- key | from_message_value field1 | field1_value field2 | field2_value When value.fields-include = EXCEPT_KEY, the resulting Decodable payload will be: Field Name | Value ------------------------------- key | from_message_key field1 | field1_value field2 | field2_value