Introducing the new MySQL and Snowflake connectors

Welcome to the early access for the new MySQL CDC and Snowflake connectors!

Decodable previously enforced a 1:1 relationship between connections and streams. This is a byproduct of being built on top of Apache Flink, which adheres to the “one connector, one table” paradigm. This meant that each connection can only connect to a single stream. However, managing multiple tables in your database, whether for reading or sending data, is cumbersome if you need to create a connection for every single table.

To address this limitation, Decodable is transitioning away from Apache Flink’s 1:1 paradigm, allowing data ingestion and consumption from multiple tables into streams using a single connection. The MySQL CDC source connector and the Snowflake sink connector are the first two connectors that support this new behavior. Now, for example, you can ingest data from multiple MySQL tables and even databases on the same host using a single MySQL connection. This connection can then route data to multiple streams within Decodable.

This feature will gradually extend to other connectors through a rolling-release process.

Getting started

The best way to manage a connection with many inputs and outputs is through the Decodable CLI. The following links can be used to download the prerelease versions of the Decodable CLI, which enable the behavior discussed in this document.

Operating system

Download

macOS (AMD64/Intel)

Download

SHA

macOS (ARM/Apple Silicon)

Download

SHA

Linux (AMD64/Intel)

Download

SHA

The instructions for installing and configuring these are the same as for installing the production version of the CLI.

Creating a connection

To create a connection that can be used with many inputs and outputs, use the Decodable CLI’s scan and apply commands. For more information about the syntax for these commands, see the Decodable CLI Release notes section in the Appendix.

  1. Create any necessary Decodable secrets for the connection type.

    decodable secret create --name mysql-password
    decodable secret write <id> --value <credential>
  2. Run a connection scan to retrieve tables and their associated schemas. See the MySQL Property Names and Snowflake Property Names sections in the Appendix for more information on the properties that must be included for each connector.
    Example for MySQL scan:

    decodable connection scan --connector mysql-cdc --type source \
                              --stream-name-template poc-{table-name} \
                              --prop hostname="<hostname>" --prop port=3306 \
                              --prop username=<username> --prop password=<secret-id> \
                              --name poc_mysql_cdc --opt with-metadata-fields=true \
                              > db_scan_single.yaml

Notes for source scans:

  • When running a source scan, any NOT NULL constraint will be omitted for any column that’s not part of the primary key , unless --opt preserve-not-nulls=true is specified.

    • This allows for NOT NULL constraints to be dropped from source table columns later, and the consequent presence of NULL values in those columns in new records, without the need to alter downstream tables or other resources. It also allows for columns to be added downstream in database systems that reject adding any column with a NOT NULL constraint to any table that contains rows. In scenarios involving downstream analytics platforms, such as Snowflake, this is typically acceptable.

  • The --opt with-metadata-fields=true flag will decorate each stream with the _decodable_op_ts metadata field, which tracks the timestamp of the last change to each row.

  • The --stream-name-template and --opt with-metadata-fields=true flags will union all like-tables into a single output stream, and decorate each stream with _decodable_database and _decodable_table metadata fields, respectively.
    Example for Snowflake scan:

    decodable connection scan --connector snowflake --prop snowflake.database="<your-db>" \
                              --prop snowflake.schema="<your-schema>"  \
                              --prop snowflake.user="<user>" --prop snowflake.role="<role>" \
                              --prop snowflake.account-name="<your-account>" \
                              --prop snowflake.warehouse="<your-warehouse>" \
                              --prop snowflake.private-key="<secret-id-from-step-1>" \
                              --prop snowflake.merge-interval="1 min" \
                              --prop snowflake.merge-parallelism="5" \
                              --prop snowflake.ingest-io-cpu-ratio=8 \
                              --name poc_snowflake_cdc  --type sink  --stream-prefix poc- \
                              > snowflake_sink_scan.yaml

After running the scan command, you’ll have a YAML file that can be used by the apply command to provision resources in Decodable and downstream data systems. Before applying the YAML file, do the following steps:

  1. Verify that the YAML output is correct.

  2. Apply any edits or transformations if needed. For example: you might need to split the YAML into several files to distribute table load across multiple connection instances, with each YAML file corresponding to a single connection.

    1. Apply the YAML to create the connections and outputs. For source connections, this creates Decodable streams, and for sink connections, it creates tables in the external system.

      decodable connection apply db_scan_single.yaml
    2. Activate the connection.

      decodable connection activate <id>

Updating a connection

The method that you use to update a connection depends on what type of change you are making to it.

Reason: You added or removed tables from the source

If you added or removed tables from the source database and want to update which tables are associated with your Decodable connection, use the scan and apply Decodable CLI commands and do the following steps.

  1. Stop the connection.

    1. Retrieve the ID of the connection that you want to stop.

      decodable connection list
    2. Stop the connection.

      decodable connection deactivate <id>
  2. Scan the external database for changes. See the Creating a connection section, on this page, for examples of how to format the scan command.

  3. At this point, you should have two YAML configuration files for the same connection. Combine the contents of the two YAML files as needed. For example, you can route multiple input tables to the same stream or split tables across multiple source connections by modifying the corresponding properties.

  4. Apply the changes to finish creating the connection and streams.

    decodable connection apply <name_of_yaml_file>.yaml
  5. Activate the connection.

    decodable connection activate <id>

Important notes:

  • Table creation and removal aren’t automatically propagated through Decodable. If you add or drop a table from a source connection, you will want to add or drop that table in the downstream sink connection as well.

  • Dropping a stream-mapping doesn’t delete the Decodable stream (for a source connection) or external table (for a sink connection) that the Decodable connection was outputting to. It simply stops the connection from writing to that output in the future.

  • If a table is dropped from the upstream source but not in Decodable, the connections will continue to run without issue. However, if the connection is restarted without updating its stream mappings, the connection will produce an error. Therefore, it’s essential to always remove the stream connection stream mapping if the upstream table is dropped.

Reason: You’ve modified the schema of a table

If you modified the schema of an existing table by adding a column and you want to reflect that change in Decodable, use the scan and apply commands.

  1. Stop the connection from the Connections page or by running the following CLI commands:

    1. Retrieve the ID of the connection that you want to stop.

      decodable connection list

    2. Stop the connection.

      decodable connection deactivate <id>

  2. Run a connection scan to retrieve updated schemas. See the Creating a connection section, on this page, for examples of how to format the scan command.

  3. Apply the newly generated YAML to update the connections and outputs using the following commands:

    Example

    decodable connection apply --opt allow-evolution=true <name_of_yaml_file>.yaml

    Example to also preserve casing for Snowflake Resource Names or Columns

    decodable connection apply <name_of_yaml_file>.yaml  \
        --opt allow-evolution=true \
        --opt preserve-case-for-resource-names=true \
        --opt preserve-case-for-column-names=true \
  4. Start the connection.

    decodable connection activate <id>

    However, if you modified the schema of an existing table by updating or removing an existing column, the scan and apply commands currently can’t be used to achieve such changes. Perform the following steps instead:

  5. Update your Decodable stream schema using Decodable Web or the Decodable CLI.

    1. If you are using Decodable Web:

      1. Stop the connection by navigating to the Connections page, selecting the connection that’s attached to the stream that you want to modify, and selecting Stop.

      2. Locate the stream associated with the changed table by navigating to the Streams page and searching for the stream name.

      3. Select the stream to go to its’ Overview page.

      4. Select Schema and update the stream’s schema so that it matches the schema of the changed table. If you are adding a field to the stream’s schema, make sure to assign the right data type to the field. For guidance on how Decodable data types correspond to third-party data types, refer to MySQL Data Types Mapping or Snowflake Data Types Mapping.

      5. Start the connection again.

    2. If you are using the Decodable CLI:

      1. Stop the connection by doing the following.

        • Retrieve the ID of the connection that you want to stop.

          decodable connection list
        • Stop the connection.

          decodable connection deactivate <id>
      2. Locate the stream associated with the changed table, and copy the 8-character stream ID.

        decodable stream list
      3. Update the stream’s schema so that it matches the schema of the changed table. You must provide the complete, updated schema for the stream, not just the changed fields.

        decodable stream update <id> --field <name>=<type> --field <name2>=<type>

        If you are adding a field to the stream’s schema, make sure to assign the right data type to the field. For guidance on how Decodable data types correspond to third-party data types, refer to the relevant Data Types Mapping sections.

      4. Verify that the stream’s schema matches the schema of the modified table by comparing the output from the Decodable CLI to the table’s schema.

        decodable stream get <id>
      5. Start the connection.

        decodable connection activate <id>
  6. If you are sending data to Snowflake, you must also update your Snowflake tables.

    1. Navigate to the Snowflake querying console.

    2. Update the sink table accordingly.

      alter table <table> drop column <column_name>;
      alter table <table> alter column <column_name> ...;

      For more information, see the following two pages:

    3. Update the schema of the corresponding staging table as needed. Staging tables are found in the same schema as the sink table, and are of the form _<table>_DECODABLE_STAGE.
      Important: Manually updating Decodable stream and Snowflake table schemas can cause your YAML files to diverge from the state of truth. Contact us if you need assistance with updates, or if your YAML file has diverged.

Appendix

MySQL property names and data type mappings

See the following sections for information about MySQL property names and the MySQL to Decodable data type mappings.

Property names

The following property names are available when configuring a MySQL connection.

Property Name in the Decodable CLI

Description

hostname

The host name for your MySQL database. For example, mysql-server.

port

Optional. The port number to use when connecting to the host.

Defaults to 3306.

username

The username to use to authenticate to MySQL.

password

The password associated with the username. This is the ID of the secret resource containing the password.

database-name

Optional. The name of the MySQL database that you want to connect to.

If not provided, we will scan all databases on the host.

Data types mapping

The following table shows the Decodable data types that are generated from the corresponding MySQL data types.

MySQL Type

Decodable Type

TINYINT TINYINT(>1)

TINYINT

SMALLINT TINYINT UNSIGNED

SMALLINT

INT MEDIUMINT SMALLINT UNSIGNED

INT

BIGINT INT UNSIGNED

BIGINT

BIGINT UNSIGNED

DECIMAL(20,0)

FLOAT

FLOAT

DOUBLE DOUBLE PRECISION

DOUBLE

NUMERIC(p, s) DECIMAL(p, s) DEC(P, s) FIXED(p, s)

DECIMAL(p,s)

BOOLEAN TINYINT(1) BIT

BOOLEAN

BIT(>1)

BYTES

YEAR

INT

DATE

DATE

TIME [(p)]

TIME [(p)]

TIMESTAMP [(p)] DATETIME [(p)]

TIMESTAMP [(p)]

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

BINARY(n)

BINARY(n)

VARBINARY(n)

VARBINARY(n)

TINYTEXT TEXT MEDIUMTEXT LONGTEXT

STRING

TINYBLOB BLOB MEDIUMBLOB LNOGBLOB

BYTES

JSON ENUM SET

STRING

Snowflake property names and data type mappings

See the following sections for information about Snowflake property names and the Decodable to Snowflake data type mappings.

Property names

The following property names are available when configuring a Snowflake connection.

Property Name in the Decodable CLI

Description

snowflake.database

The name of the database containing the table that you want to send data to.

snowflake.schema

The name of the schema containing the table that you want to send data to.

snowflake.table

The name of an existing table that you want to send data to.

Note: The schema of the table must match the schema of the data being sent through the Snowflake connector.

snowflake.user

The name of the user sending data to the Snowflake table. The user must be granted usage permissions on the provided role.

snowflake.private-key

The private key associated with the user, provided as a secret resource. The secret resource should contain the private key, excluding the -----BEGIN PRIVATE KEY----- and -----END PRIVATE KEY----- header and footers.

snowflake.role

The name of the role to use for all Snowflake actions. The role must be granted the following privileges.

If you are sending data from an append stream:

- INSERT privileges on the Snowflake table.

- USAGE privileges on the schema and database.

If you are sending data from change streams, you must additionally have the following privileges:

- INSERT, UPDATE`, SELECT, DELETE privileges on the Snowflake table.

- USAGE privileges on the warehouse.

-CREATE TABLE privileges on the schema.

snowflake.account-name

The name of your Snowflake account, formatted as <organization>-<name>.

If you are using the Snowsight web interface, select Admin > Accounts. The organization is listed above the account name. If you are using the classic web interface, select the user profile in the dropdown to reveal the organization ID and account name.

snowflake.warehouse

The name of the Snowflake warehouse that you configured in the prerequisites. For append-streams, Decodable uses warehouses to create the sink tables during connection creation. For change-streams, warehouses are used to continuously merge data into the sink Snowflake tables.

snowflake.merge-interval

Change records are sent to the sink table in batches. Define the interval to wait before sending the change records into the table.

For example, 1 minute, 5h, 100 sec, etc.

Supported time unit labels are:

- Days d, day

- Hours: h, hour

- Minutes: min, minute

- Seconds: s, sec, second

- Milliseconds: ms, milli, millisecond

- Microseconds: µs, micro, microsecond

- Nanoseconds: ns, nano, nanosecond

Required for change streams, unsupported for append streams.

Data types mapping

The following table describes the mapping of Decodable data types to their Snowflake data type counterparts.

Decodable Type

Snowflake Type

Char

CHARACTER

VarChar/String

VARCHAR

Boolean

BOOLEAN

Binary

BINARY

VarBinary, Bytes

VARBINARY

Decimal, Dec, Numeric

NUMBER

TinyInt

TINYINT

SmallInt

SMALLINT

Int

INT

BigInt

BIGINT

Float

FLOAT

Double

DOUBLE

Date

DATE

Time

TIME

Timestamp

TIMESTAMP_NTZ

Timestamp_ltz

TIMESTAMP_LTZ

Array

ARRAY

Map

OBJECT

Row

Not supported

MultiSet

Not supported

Interval

Not supported

Decodable CLI Release notes

The pre-release Decodable CLI v1.9.0 adds two additional commands scan and apply that are used for creating MySQL or Snowflake connections.

  • Scan: This step creates a YAML specification for a connection with all relevant information. The scan operation infers the available databases/tables in the external system for a source connection and the available streams in Decodable for a sink connection.

  • Apply: This step applies the YAML, creating the specified streams in Decodable (for source connectors) or external resources (for sink connectors). It also creates the connection. Note that if targets (streams or external resources) already exist, the schema must match what’s written in the YAML file. Otherwise, the operation fails.

Note that there are technical differences between source and sink scans:

  • Scanning a source: A source scan actually scans the databases/tables available in the external system and infers the corresponding stream mappings.

  • Scanning a sink: A sink scan merely scans the available streams in Decodable and infers the corresponding stream mappings to external tables. There is no interaction with external resources. Despite this difference, a sink scan command requires all connection properties to be specified (like username and password/key for the external system), to maintain symmetry with source scans.

Scan options

Required flags:

  • connector: Specify the connector that you want to use. Either mysql-cdc or snowflake.

  • type: Either source or sink.

  • name: A name for the connection.

  • prop: Required properties defined by the connector. See MySQL Property Names or Snowflake Property Names

Optional flags

  • description: A description for the connection.

Source-only flags

  • stream-prefix: A common prefix to add to all created stream names.

  • stream-name-template: A template to use to create stream names. You can specify properties included in the source’s external resource specifier as template variables, enclosed in curly braces. For MySQL, the available specifiers are {database-name} and {table-name}.

    • For example, the template my-stream-{database-name}-{table-name} would produce a stream name like my-stream-db01-customers for the table customers in the database db01.

  • --prop database-name: If specified, restricts the scan to a single database instead of scanning all databases on the host.

  • --opt with-metadata-fields=true: Include metadata fields _decodable_database, _decodable_table, and _decodable_op_ts`in the connection and created streams. Useful when unioning tables into a single stream, and when tracking when a given row was last updated. The `_decodable_database and _decodable_table metadata fields are added to the stream primary key as well.

    • Note that _decodable_op_ts defaults to 1970-01-01 00:00:00.000 for all existing records in the upstream MySQL table, upon first starting the connector. All records inserted or updated following the initial connection activation will be populated with the UTC-based timestamp of the operation.

  • --opt preserve-not-nulls=true: Include the type NOT NULL in the created stream.

Sink-only flags

  • stream-prefix: Filter Decodable streams by this prefix, creating stream mappings only for the resulting set of streams. Useful when stream-prefix or stream-name-template were used in the creation of source streams, allowing you to select only those streams.

Apply options

Optional flags

  • --opt allow-evolution=true: Apply any new columns detected during a connection scan.

Sink-only flags

  • --opt preserve-case-for-resource-names=true: Preserve casing for resource names.

  • --opt preserve-case-for-column-names=true: Preserve casing for column names.

Notes about updating a MySQL or Snowflake connection using the Decodable CLI

  • For both source (stream creation) and sink (external resource creation), there is no difference between the create and update flow.

    • If the table or stream doesn’t exist yet, they’ll be created.

    • If the table or stream does exist and the schema matches, the existing one is used.

    • If the table or stream does exist but the schema doesn’t match, the command fails.

  • For the apply command, the CLI automatically detects whether a connection with the specified name already exists and creates or updates the connection accordingly.

    • Connection properties that aren’t included in the request aren’t touched (for example, description or specific props).

    • Stream mappings are replaced with the list in the YAML. The apply command doesn’t do any merging itself.