Example: Move data between two Apache Kafka topics declaratively

Let’s look at an example YAML file that creates the Decodable resources required to establish a connection between two Apache Kafka topics. It defines four resources: a secret containing our Apache Kafka password, a stream, and two connections: one to receive data from one Kafka topic and another to send data to another Apache Kafka topic.

  1. Create the required resources. In this step, we’ll create a secret for our connection, configure a source and sink connection, and create a stream to transport data between these connections.

    ---
    kind: secret
    metadata:
      name: kafka-sasl-password
      description: Password for Kafka SASL username
    spec_version: v1
    ---
    kind: connection
    metadata:
      name: My-Kafka-Connection
      description: A connection to my Kafka topic
    spec_version: v1
    spec:
      connector: kafka # The name of the connector.
      type: source # The type of connector. Enter source if your connector receives data or sink if your connector sends data.
      properties: # The properties of the connector that you want to use. Refer to the connector's documentation for property names and their valid values.
        bootstrap.servers: <broker_list>
        topic: <source_topic_name>
        value.format: json
        security.protocol: SASL_SSL
        sasl.mechanism: SCRAM-SHA-256
        sasl.username: <username>
        sasl.password: kafka-sasl-password # The name of the secret defined earlier.
      stream_name: my-kafka-stream # The name of the stream that you want this connector to send data to.
      schema_v2: # The schema of the connection. This must match the schema of the stream that it is connected to exactly, including any constraints like watermarks or primary key fields.
        fields:
          - kind: physical
            name: field1
            type: string
          - kind: physical
            name: field2
            type: string
          - kind: physical
            name: field3
            type: string
    ---
    kind: stream
    metadata:
      name: my-kafka-stream
      description: An example Kafka stream
    spec_version: v1
    spec:
      schema_v2:
        fields:
          - kind: physical
            name: field1
            type: string
          - kind: physical
            name: field2
            type: string
          - kind: physical
            name: field3
            type: string
    ---
    kind: connection
    metadata:
      name: Destination-Kafka-Connection
      description: A connection to my Kafka topic
    spec_version: v1
    spec:
      connector: kafka
      type: sink
      properties:
        bootstrap.servers: <broker_list>
        topic: <destination_topic_name>
        value.format: json
        security.protocol: SASL_SSL
        sasl.mechanism: SCRAM-SHA-256
        sasl.username: <username>
        sasl.password: kafka-sasl-password # The name of the secret defined above.
      stream_name: my-kafka-stream # The name of the stream that you want this connector to receive data from.
      schema_v2:
        fields:
          - kind: physical
            name: field1
            type: string
          - kind: physical
            name: field2
            type: string
          - kind: physical
            name: field3
            type: string

    A response containing the resources that were created or modified is shown.

  2. Apply the YAML definitions to create the resources.

    decodable apply resources.yaml
  3. If the YAML contains new secrets, add the plaintext secret values.

    • Using Decodable Web:

      1. From the Secrets page, find the secret that you defined earlier.

      2. Select the ellipsis icon (…​) > Set Value.

      3. Enter the secret value and select Set.

    • Using the Decodable CLI:

      decodable secret write <secret_id> --value <secret_value>
  4. Connections and pipelines are not activated by default. Activate them to start processing data.

    • Using Decodable Web:

      1. Depending on the kind of resource that you want to activate, navigate to the Connections or Pipelines page.

      2. Select the resource that you defined earlier.

      3. From the resource’s Overview page, select Start.

    • Using the Decodable CLI:

      • Connections:

        decodable connection activate <connection_id>
      • Pipelines:

        decodable pipeline activate <pipeline_id>