Redpanda source connector Features Connector name redpanda Delivery guarantee Exactly once Supported task sizes S, M, L Multiplex capability A single instance of this connector can read from a single topic. Supported stream types Append stream Configuration properties Property Description Required Default Connection bootstrap.servers A comma-separated list of your Redpanda brokers. You must enter at least one broker. Enter each broker in the format: <host>:<port>. The broker list can’t contain any spaces. Yes security.protocol Specify the security protocol to use when connecting to a broker. Must be one of the following: PLAINTEXT TLS SASL_SSL SASL_PLAINTEXT See the Security protocols section for more information about these protocols. For the additional fields that you’ll need to fill out if you want to secure your connection with TLS or SASL Authentication. — PLAINTEXT Data topic The name of the topic from which to read data. Yes key.format The format for the key, if present, in the Redpanda message. Must be one of: avro avro-confluent (see below for further properties) json debezium-json raw See Serialization formats for more details. — key.fields A list of the key’s field names, delimited by semicolons, that comprise the message key. For nested fields specify the root key only, and the nested fields as part of the connection’s schema. See Message key for more details and an example of using Redpanda message keys. — value.format The format for the value payload in the Redpanda message. Must be one of: avro avro-confluent (see below for further properties) json debezium-json debezium-avro-confluent raw See Serialization formats for more details. — value.fields-include Which fields from the Redpanda message to include in the payload written to Decodable. ALL: all fields are written, including the fields from the key EXCEPT_KEY: all fields from the value of the message are written, but not those from the key — ALL Advanced parse-error-policy Select the error handling policy. Must be one of the following: FAIL: When set to FAIL, Decodable stops the connection if any validation errors are encountered in the incoming data stream. IGNORE: When set to IGNORE, Decodable ignores all invalid records. All validated records are sent. With this policy, the connection isn’t stopped nor will any errors or warnings be shown if there is an invalid record. — scan.startup.mode Specifies where to start reading data when the connection is first started, or when it’s restarted with the state discarded. Must be one of the following: group-offsets: Start reading data from a specified group consumer. earliest-offset: Start reading data from the earliest available point in the stream. latest-offset: Start reading data from the latest available point in the stream. timestamp: Start reading data from a specific timestamp. specific-offset: Start reading data from a specific offset value. — latest-offset properties.group.id The group ID used by the consumer. Only required if scan.startup.mode is set to group-offsets. properties.auto.offset.reset What to do when there is no initial offset in Redpanda (like a new consumer group) or if the specified offset no longer exists on the server (because of data deletion). — Format: Avro avro-confluent.schema-registry.url The URL of the schema registry to use. Yes avro-confluent.basic-auth.credentials-source The source of the authentication credentials to use, if required. Must be one of: USER_INFO SASL_INHERIT. Typically USER_INFO is used most often when schema registry authentication is enabled. — avro-confluent.basic-auth.user-info The authentication credentials to use, if credentials are to be used. This must be provided as a secret resource. The contents should be in the form username:password. — avro-confluent.long-schema-id Whether to parse the schema ID a LONG. Set this to true for reading and writing Apicurio-style payloads rather than the Confluent default. If set to false then the schema id is read as an INTEGER. — false Security: SASL sasl.mechanism Specify the SASL mechanism as configured by the Redpanda broker. Valid values are: PLAIN SCRAM-SHA-256 SCRAM-SHA-512 Yes sasl.username The username or API key for authentication. Yes sasl.password The secret associated with your provided API key. This must be provided as a secret resource. Yes Security: TLS tls.broker.certificate The public certificate for the broker used to encrypt traffic to Decodable. Yes tls.client.key The secret associated with the client TLS key used by mTLS connections. The key must be an unencrypted key in PKCS#8 format. This must be provided as a secret resource. Only required for mTLS tls.client.certificate The client certificate signed by the broker. You must use the Decodable CLI to specify the client certificate. See here for more details. Message key To read the data held in the message key you need to do the following: Specify the format of the data in the key with key.format. Specify the name of the fields in the key with key.fields. Set value.fields-include=EXCEPT_KEY. If you are using the Decodable web interface to create the connection, you will need to use the CLI (as shown below) or API after creating the connection to update it with this property. Add the key fields and their data types to the connection schema. Do not set these as primary key in the schema. Example This example is based on a Redpanda topic in which the key looks like this: { "tenant": { "id": "acme-corp", "region": "us-west-2" } } and the value like this: { "product_id": 1, "order_ts": 1534772501276, "total_amount": 10.50 } CLI To create a connection directly based on the above message key and value with the CLI use: decodable connection create \ --name orders \ --type source \ --connector redpanda \ --prop bootstrap.servers=broker:port \ --prop value.format=json \ --prop key.fields=tenant \ --prop key.format=json \ --prop parse-error-policy=FAIL \ --prop properties.auto.offset.reset=none \ --prop scan.startup.mode=earliest-offset \ --prop topic=orders \ --prop value.fields-include=EXCEPT_KEY \ --field tenant="ROW<id STRING, region STRING>" \ --field product_id="INT" \ --field order_ts="BIGINT" \ --field total_amount="FLOAT" Web app To configure a connection in the Decodable web app do the following: Create a connection to your Redpanda broker including the following settings: Key Format JSON Key Fields tenant Value Format JSON Define the connection schema. This must include the field from the key (tenant). Do not mark it as a primary key field. Save the connection, but don’t activate it yet. Make a note of your connection id. Using the Decodable CLI update your connection (using the id from the previous step) to add the value.fields-include property decodable connection update 553feb9e --prop value.fields-include=EXCEPT_KEY Activate your connection as normal The data from the Redpanda message key will be included in the stream: Metadata fields When consuming records from Redpanda in Decodable, you can access specific metadata fields associated with each Redpanda record. A metadata field is identified by a string-based key and an associated data type. See available metadata for a list of metadata fields that you have access to. A metadata field has its type formed as: {datatype} METADATA [FROM '{key}'] A metadata field is indicated by the METADATA keyword, with an optional FROM to provide the key. If the key isn’t provided explicitly with FROM, it defaults to the name of the field. For example: TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' Available metadata When you connect to Redpanda as a data source, you have access to the following metadata fields. Key Data Type Description topic STRING NOT NULL Topic name of the Redpanda record. partition INT NOT NULL Partition ID of the Redpanda record. headers MAP<string, bytes> NOT NULL Headers of the Redpanda record as a map of raw bytes. leader-epoch INT NULL Leader epoch of the Redpanda record if available. offset BIGINT NOT NULL Offset of the Redpanda record in the partition. timestamp TIMESTAMP_LTZ(3) NOT NULL Timestamp of the Redpanda record. timestamp-type STRING NOT NULL Timestamp type of the Redpanda record. Valid values are: NoTimestampType, CreateTime, or LogAppendTime. Serialization formats The following formats are supported: Format Description json JSON data raw Raw (byte based) values as a single column. avro Plain Avro data using the schema defined when you create the connection avro-confluent Avro data using a predefined schema store in a schema registry. debezium-json A unified format schema for changelogs with additional support for serializing messages using JSON. Select this option or debezium-avro-confluent if you want to send Change Data Capture (CDC) data through this connector. debezium-avro-confluent A unified format schema for changelogs with additional support for serializing messages using Avro with the schema held in a schema registry. Select this option or debezium-json if you want to send Change Data Capture (CDC) data through this connector. Avro A popular choice for data serialization on Redpanda is Apache Avro. Decodable supports both plain Avro, as well as Avro with a schema registry. In order to use plain Avro (without the schema held in a Schema Registry) the Avro schema is generated from the connection schema, and is available on the inferred_schema connection property. For example: $ decodable connection get 69caa7e1 avro_test_source id 69caa7e1 description A Redpanda source connection with plain Avro schema connector redpanda type source stream id 865e8555 schema 0 field1 INT NOT NULL 1 field2 DOUBLE NOT NULL 2 field3 STRING NOT NULL properties bootstrap.servers broker.svc.cluster.local:9092 format avro inferred_schema {"type":"record","name":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"double"},{"name":"field3","type":"string"}]} topic avro_topic […] Avro with Schema Registry In this mode, the Avro schema is still derived from the connection schema, but it’s validated and registered against the configured schema registry. To use Avro with a Schema Registry such as Confluent’s, additional connection properties are required. These can be specified when you create the connection, if using the CLI or API. If you create the connection through the UI you will need to use the CLI/API to update the connection to add the necessary properties. For example, to create a source Redpanda connection reading Avro messages using a secured schema registry using the CLI: decodable connection create \ --name avro_registry_source \ --description "A Redpanda source connection with SASL auth and a schema registry" \ --type source \ --stream-id 295e2a7f \ --connector redpanda \ --prop bootstrap.servers="some.broker.cloud:9092" \ --prop topic="my-topic" \ --prop value.format=avro-confluent \ --prop avro-confluent.schema-registry.url="https://my.schema.registry.cloud" \ --prop avro-confluent.basic-auth.credentials-source=USER_INFO \ --prop avro-confluent.basic-auth.user-info="my-registry-user:my-registry-password" Security protocols Decodable supports four different security protocols for accessing Redpanda brokers. Security protocol Description SASL-authenticated Username and password authentication is used with SASL. Both SSL/TLS and PLAINTEXT encryption is supported, as well as SCRAM and PLAIN authentication mechanisms. mTLS-authenticated Two-way SSL authentication is used, so that Decodable and the Redpanda brokers authenticate each other using the SSL protocol. Additionally, the connection is encrypted using SSL. TLS-authenticated One-way SSL authentication is used. The client (Decodable) holds the server’s (Redpanda brokers) public certificate. Data from the Redpanda brokers is encrypted using the server’s private key and Decodable can decrypt it using the public certificate. Data from Decodable is encrypted using the public certificate and can only be decrypted using the Redpanda broker’s private key. Unauthenticated No authentication takes place between Decodable and the Redpanda brokers. The connection isn’t encrypted. Because Redpanda is fully compatible with Apache Kafka, the configuration required to process data from Redpanda into Decodable is the same as when you process data from Kafka. For detailed instructions on how to create a TLS or mTLS secured connection between Redpanda and Decodable, see the following topics: How to set up SASL Authentication with Kafka. How to set up TLS Encryption with Kafka. How to set up Mutual TLS Authentication with Kafka. Connector starting state and offsets When you create a connection, or restart it and discard state, it will read from the position in the topic as defined by scan startup mode. By default this is latest-offset and will therefore read from the end of the topic. Learn more about starting state here.