Connections

A connection allows data to flow between a Decodable stream and an external system like a database, messaging system or storage system. Connections include technology-specific configuration that allows them to communicate with these systems, typically including hostnames, ports, authentication information, and other settings.

Once configured, a connection can be activated to stream data to or from the Decodable platform. Connections come in two flavors: source and sink. Source connections read from an external system and write to a Decodable stream, while sink connections read from a stream and write to an external system.

Decodable works by making network connections to the resources you specify in connections. As a result, things like hostnames must be resolvable, and IP addresses must be routable. Let us know if you need help managing connectivity to your data infrastructure.

Common Connector Features

Each connector specifies the set of features it supports from a common set.

Processing Guarantees - Connectors always specify whether they provide at least once, at most once, or exactly once processing. While Decodable's stream processing engine always processes data exactly once, some connectors don't support this level of guarantee or make it configurable.

Stream Direction - Connectors specify whether they support being used as sources, sinks, or both.

Managing Connections

Connections can be viewed, created, updated, deleted, activated, and deactivated from the app, CLI, or the API. Every connection has a small set of common configuration elements, and then a set of technology-specific properties. Properties are documented for each supported connector in the Connector Reference.

When connections are initially created, they're in a deactivated state. They can have properties updated, assigned to new streams, or even deleted. Once activated, the connection will begin reading data from the configured source and writing to the destination sink. Once activated, the connection will show its target state as RUNNING, and updates to the connection will not be allowed until it is deactivated.

All connectors have these common elements:

id (system-generated) - A unique id generated by the system to track this connection. See Concepts - Resource IDs for more information.

name (required) - A name to help you identify this connection. Names are typically short, all lowercase, and with words separated by hyphens (-) or underscores (_). Examples: kafka-prod, ml_cluster3.

connector (required) - This specifies the connector to use, and must be one of the supported connectors described in the Connector Reference. Depending on the connection's type, different properties will be supported. A connection's connector can not be updated once it's created.

type (required) - Whether the connection will operate as a "source" or "sink". Some connectors only support one type or the other. See Connector Reference for more information. A connection's type cannot be updated once it's created.

description - A longer description of the connection. Use this to help you and your team know when and how to use this connection.

schema (required) - The names and types of the fields used by the connection. The schema's fields are provided individually using the --field flag. To modify the schema, the entire new schema must be provided with the update command, not just the new fields. See the Data Types section below for more information.

stream-id - The stream that the connection will write to (if the type is "source") or read from (if the type is "sink"). If stream-id is not provided, a new stream will be provisioned for the connection using the provided schema.

Connections also support properties that provide connector-specific configuration. Properties are just key/value pairs. The keys and values are always strings, although the connectors may require that values be formatted a specific way.

Data types

Connections support the same physical and computed schema field types as do Streams. See the Streams Data Types reference for full detail.

Metadata fields

In addition, some Connections support metadata fields. Currently these must be source Connections that are not in a Debezium (change-envelope) format.

The set of metadata fields exposed to a Connection is specific to its connector. Every metadata field is identified by a string-based key, and has a data type. For example, the Kafka connector exposes a metadata field with key timestamp and data type TIMESTAMP_LTZ(3).

A metadata field has type formed as:

{datatype} METADATA [FROM '{key}']

Thus, a metadata field is indicated by the METADATA keyword, with an optional FROM to provide the key. If the key is not provided explicitly (with FROM), it defaults to the name of the field.

For example:

TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'

Example Operations

Creating a Kafka source connection:

decodable conn create \
  --name kafka_prod \
  --connector kafka \
  --type source \
  --stream-id 2fc5dc5a \
  --field raw_data=STRING \
  --field new_field=STRING \
  --prop properties.bootstrap.servers=a:9093,b:9093,c:9093 \
  --prop topic=my_topic \
  --prop value.format=json

# Created connection kafka_prod (3e91ade5)

Listing existing connections:

decodable conn list

# id         name                     connector    type         create time                   update time
# 3e91ade5   kafka_prod               kafka        source       2021-11-03T22:07:13Z          2021-11-03T22:07:13Z
# 45603509   datagen_envoy_connection datagen      source       2021-11-03T16:14:06Z          2021-11-03T16:14:06Z

Viewing a connection:

decodable conn get 3e91ade5
# kafka_prod
#   id                       3e91ade5
#   description
#   connector                kafka
#   type                     source
#   stream id                2fc5dc5a
#   schema
#     0  raw_data              STRING
#     1  new_field             STRING
#   properties
#     properties.bootstrap.servers a:9093,b:9093,c:9093
#     topic                    my_topic
#     value.format             json
#   target state             STOPPED
#   actual state             STOPPED
#   requested tasks          1
#   actual tasks             0
#   create time              2021-11-03T22:07:13Z
#   update time              2021-11-03T22:07:13Z
#   last runtime error       <none>
#   metrics                  <none>

Updating a connection and changing its description:

decodable conn update 3e91ade5 --description "Production MSK cluster in us-west-2."
# Updated connection "3e91ade5"

Updating a connection and modifying a property:

decodable conn update 3e91ade5 --prop topic=other_topic
# Updated connection "3e91ade5"

Seeing our changes:

decodable conn get 3e91ade5
# kafka_prod
#   id                       3e91ade5
#   description              Production MSK cluster in us-west-2.
#   connector                kafka
#   type                     source
#   stream id                2fc5dc5a
#   schema
#     0  raw_data              STRING
#     1  new_field             STRING
#   properties
#     properties.bootstrap.servers a:9093,b:9093,c:9093
#     topic                    other_topic
#     value.format             json
#   target state             STOPPED
#   actual state             STOPPED
#   requested tasks          1
#   actual tasks             0
#   create time              2021-11-03T22:07:13Z
#   update time              2021-11-03T22:28:13Z
#   last runtime error       <none>
#   metrics                  <none>

Activating the connection:

decodable conn activate 3e91ade5 --tasks 1
# kafka_prod
#   id                       3e91ade5
#   description              Production MSK cluster in us-west-2.
#   connector                kafka
#   type                     source
#   stream id                2fc5dc5a
#   schema
#     0  raw_data              STRING
#     1  new_field             STRING
#   properties
#     properties.bootstrap.servers a:9093,b:9093,c:9093
#     topic                    other_topic
#     value.format             json
#   target state             RUNNING
#   actual state             STARTING
#   requested tasks          1
#   actual tasks             0
#   create time              2021-11-03T22:07:13Z
#   update time              2021-11-03T22:28:13Z
#   last runtime error       <none>
#   metrics                  <none>

Deactivating the connection:

decodable conn deactivate 3e91ade5
# kafka_prod
#   id                       3e91ade5
#   description              Production MSK cluster in us-west-2.
#   connector                kafka
#   type                     source
#   stream id                2fc5dc5a
#   schema
#     0  raw_data              STRING
#     1  new_field             STRING
#   properties
#     properties.bootstrap.servers a:9093,b:9093,c:9093
#     topic                    other_topic
#     value.format             json
#   target state             STOPPED
#   actual state             STOPPED
#   requested tasks          1
#   actual tasks             0
#   create time              2021-11-03T22:07:13Z
#   update time              2021-11-03T22:28:13Z
#   last runtime error       <none>
#   metrics                  <none>

Connector Reference

Check out the Connector Reference for more information about supported connectors and the properties they support.


What’s Next