Introducing the new MySQL and Snowflake connectors Welcome to the early access for the new MySQL CDC and Snowflake connectors! Decodable previously enforced a 1:1 relationship between connections and streams. This is a byproduct of being built on top of Apache Flink, which adheres to the “one connector, one table” paradigm. This meant that each connection can only connect to a single stream. However, managing multiple tables in your database, whether for reading or sending data, is cumbersome if you need to create a connection for every single table. To address this limitation, Decodable is transitioning away from Apache Flink’s 1:1 paradigm, allowing data ingestion and consumption from multiple tables into streams using a single connection. The MySQL CDC source connector and the Snowflake sink connector are the first two connectors that support this new behavior. Now, for example, you can ingest data from multiple MySQL tables and even databases on the same host using a single MySQL connection. This connection can then route data to multiple streams within Decodable. This feature will gradually extend to other connectors through a rolling-release process. Getting started The best way to manage a connection with many inputs and outputs is through the Decodable CLI. The following links can be used to download the prerelease versions of the Decodable CLI, which enable the behavior discussed in this document. Operating system Download macOS (AMD64/Intel) Download SHA macOS (ARM/Apple Silicon) Download SHA Linux (AMD64/Intel) Download SHA The instructions for installing and configuring these are the same as for installing the production version of the CLI. Creating a connection To create a connection that can be used with many inputs and outputs, use the Decodable CLI’s scan and apply commands. For more information about the syntax for these commands, see the Decodable CLI Release notes section in the Appendix. Create any necessary Decodable secrets for the connection type. decodable secret create --name mysql-password decodable secret write <id> --value <credential> Run a connection scan to retrieve tables and their associated schemas. See the MySQL Property Names and Snowflake Property Names sections in the Appendix for more information on the properties that must be included for each connector. Example for MySQL scan: decodable connection scan --connector mysql-cdc --type source \ --stream-name-template poc-{table-name} \ --prop hostname="<hostname>" --prop port=3306 \ --prop username=<username> --prop password=<secret-id> \ --name poc_mysql_cdc --opt with-metadata-fields=true \ > db_scan_single.yaml Notes for source scans: When running a source scan, any NOT NULL constraint will be omitted for any column that’s not part of the primary key , unless --opt preserve-not-nulls=true is specified. This allows for NOT NULL constraints to be dropped from source table columns later, and the consequent presence of NULL values in those columns in new records, without the need to alter downstream tables or other resources. It also allows for columns to be added downstream in database systems that reject adding any column with a NOT NULL constraint to any table that contains rows. In scenarios involving downstream analytics platforms, such as Snowflake, this is typically acceptable. The --opt with-metadata-fields=true flag will decorate each stream with the _decodable_op_ts metadata field, which tracks the timestamp of the last change to each row. The --stream-name-template and --opt with-metadata-fields=true flags will union all like-tables into a single output stream, and decorate each stream with _decodable_database and _decodable_table metadata fields, respectively. Example for Snowflake scan: decodable connection scan --connector snowflake --prop snowflake.database="<your-db>" \ --prop snowflake.schema="<your-schema>" \ --prop snowflake.user="<user>" --prop snowflake.role="<role>" \ --prop snowflake.account-name="<your-account>" \ --prop snowflake.warehouse="<your-warehouse>" \ --prop snowflake.private-key="<secret-id-from-step-1>" \ --prop snowflake.merge-interval="1 min" \ --prop snowflake.merge-parallelism="5" \ --prop snowflake.ingest-io-cpu-ratio=8 \ --name poc_snowflake_cdc --type sink --stream-prefix poc- \ > snowflake_sink_scan.yaml After running the scan command, you’ll have a YAML file that can be used by the apply command to provision resources in Decodable and downstream data systems. Before applying the YAML file, do the following steps: Verify that the YAML output is correct. Apply any edits or transformations if needed. For example: you might need to split the YAML into several files to distribute table load across multiple connection instances, with each YAML file corresponding to a single connection. Apply the YAML to create the connections and outputs. For source connections, this creates Decodable streams, and for sink connections, it creates tables in the external system. decodable connection apply db_scan_single.yaml Activate the connection. decodable connection activate <id> Updating a connection The method that you use to update a connection depends on what type of change you are making to it. See Reason: You added or removed tables from the source for table changes. See Reason: You’ve modified the schema of a table for schema evolution changes. Reason: You added or removed tables from the source If you added or removed tables from the source database and want to update which tables are associated with your Decodable connection, use the scan and apply Decodable CLI commands and do the following steps. Stop the connection. Retrieve the ID of the connection that you want to stop. decodable connection list Stop the connection. decodable connection deactivate <id> Scan the external database for changes. See the Creating a connection section, on this page, for examples of how to format the scan command. At this point, you should have two YAML configuration files for the same connection. Combine the contents of the two YAML files as needed. For example, you can route multiple input tables to the same stream or split tables across multiple source connections by modifying the corresponding properties. Apply the changes to finish creating the connection and streams. decodable connection apply <name_of_yaml_file>.yaml Activate the connection. decodable connection activate <id> Important notes: Table creation and removal aren’t automatically propagated through Decodable. If you add or drop a table from a source connection, you will want to add or drop that table in the downstream sink connection as well. Dropping a stream-mapping doesn’t delete the Decodable stream (for a source connection) or external table (for a sink connection) that the Decodable connection was outputting to. It simply stops the connection from writing to that output in the future. If a table is dropped from the upstream source but not in Decodable, the connections will continue to run without issue. However, if the connection is restarted without updating its stream mappings, the connection will produce an error. Therefore, it’s essential to always remove the stream connection stream mapping if the upstream table is dropped. Reason: You’ve modified the schema of a table If you modified the schema of an existing table by adding a column and you want to reflect that change in Decodable, use the scan and apply commands. Stop the connection from the Connections page or by running the following CLI commands: Retrieve the ID of the connection that you want to stop. decodable connection list Stop the connection. decodable connection deactivate <id> Run a connection scan to retrieve updated schemas. See the Creating a connection section, on this page, for examples of how to format the scan command. Apply the newly generated YAML to update the connections and outputs using the following commands: Example decodable connection apply --opt allow-evolution=true <name_of_yaml_file>.yaml Example to also preserve casing for Snowflake Resource Names or Columns decodable connection apply <name_of_yaml_file>.yaml \ --opt allow-evolution=true \ --opt preserve-case-for-resource-names=true \ --opt preserve-case-for-column-names=true \ Start the connection. decodable connection activate <id> However, if you modified the schema of an existing table by updating or removing an existing column, the scan and apply commands currently can’t be used to achieve such changes. Perform the following steps instead: Update your Decodable stream schema using Decodable Web or the Decodable CLI. If you are using Decodable Web: Stop the connection by navigating to the Connections page, selecting the connection that’s attached to the stream that you want to modify, and selecting Stop. Locate the stream associated with the changed table by navigating to the Streams page and searching for the stream name. Select the stream to go to its’ Overview page. Select Schema and update the stream’s schema so that it matches the schema of the changed table. If you are adding a field to the stream’s schema, make sure to assign the right data type to the field. For guidance on how Decodable data types correspond to third-party data types, refer to MySQL Data Types Mapping or Snowflake Data Types Mapping. Start the connection again. If you are using the Decodable CLI: Stop the connection by doing the following. Retrieve the ID of the connection that you want to stop. decodable connection list Stop the connection. decodable connection deactivate <id> Locate the stream associated with the changed table, and copy the 8-character stream ID. decodable stream list Update the stream’s schema so that it matches the schema of the changed table. You must provide the complete, updated schema for the stream, not just the changed fields. decodable stream update <id> --field <name>=<type> --field <name2>=<type> If you are adding a field to the stream’s schema, make sure to assign the right data type to the field. For guidance on how Decodable data types correspond to third-party data types, refer to the relevant Data Types Mapping sections. MySQL Data Types Mapping Snowflake Data Types Mapping Verify that the stream’s schema matches the schema of the modified table by comparing the output from the Decodable CLI to the table’s schema. decodable stream get <id> Start the connection. decodable connection activate <id> If you are sending data to Snowflake, you must also update your Snowflake tables. Navigate to the Snowflake querying console. Update the sink table accordingly. alter table <table> drop column <column_name>; alter table <table> alter column <column_name> ...; For more information, see the following two pages: See Snowflake Data Types Mapping for how Decodable types map to Snowflake types. See the "Alter table…alter column" topic in the Snowflake documentation for alter column usage information. Update the schema of the corresponding staging table as needed. Staging tables are found in the same schema as the sink table, and are of the form _<table>_DECODABLE_STAGE. Important: Manually updating Decodable stream and Snowflake table schemas can cause your YAML files to diverge from the state of truth. Contact us if you need assistance with updates, or if your YAML file has diverged. Appendix MySQL property names and data type mappings See the following sections for information about MySQL property names and the MySQL to Decodable data type mappings. Property names The following property names are available when configuring a MySQL connection. Property Name in the Decodable CLI Description hostname The host name for your MySQL database. For example, mysql-server. port Optional. The port number to use when connecting to the host. Defaults to 3306. username The username to use to authenticate to MySQL. password The password associated with the username. This is the ID of the secret resource containing the password. database-name Optional. The name of the MySQL database that you want to connect to. If not provided, we will scan all databases on the host. Data types mapping The following table shows the Decodable data types that are generated from the corresponding MySQL data types. MySQL Type Decodable Type TINYINT TINYINT(>1) TINYINT SMALLINT TINYINT UNSIGNED SMALLINT INT MEDIUMINT SMALLINT UNSIGNED INT BIGINT INT UNSIGNED BIGINT BIGINT UNSIGNED DECIMAL(20,0) FLOAT FLOAT DOUBLE DOUBLE PRECISION DOUBLE NUMERIC(p, s) DECIMAL(p, s) DEC(P, s) FIXED(p, s) DECIMAL(p,s) BOOLEAN TINYINT(1) BIT BOOLEAN BIT(>1) BYTES YEAR INT DATE DATE TIME [(p)] TIME [(p)] TIMESTAMP [(p)] DATETIME [(p)] TIMESTAMP [(p)] CHAR(n) CHAR(n) VARCHAR(n) VARCHAR(n) BINARY(n) BINARY(n) VARBINARY(n) VARBINARY(n) TINYTEXT TEXT MEDIUMTEXT LONGTEXT STRING TINYBLOB BLOB MEDIUMBLOB LNOGBLOB BYTES JSON ENUM SET STRING Snowflake property names and data type mappings See the following sections for information about Snowflake property names and the Decodable to Snowflake data type mappings. Property names The following property names are available when configuring a Snowflake connection. Property Name in the Decodable CLI Description snowflake.database The name of the database containing the table that you want to send data to. snowflake.schema The name of the schema containing the table that you want to send data to. snowflake.table The name of an existing table that you want to send data to. Note: The schema of the table must match the schema of the data being sent through the Snowflake connector. snowflake.user The name of the user sending data to the Snowflake table. The user must be granted usage permissions on the provided role. snowflake.private-key The private key associated with the user, provided as a secret resource. The secret resource should contain the private key, excluding the -----BEGIN PRIVATE KEY----- and -----END PRIVATE KEY----- header and footers. snowflake.role The name of the role to use for all Snowflake actions. The role must be granted the following privileges. If you are sending data from an append stream: - INSERT privileges on the Snowflake table. - USAGE privileges on the schema and database. If you are sending data from change streams, you must additionally have the following privileges: - INSERT, UPDATE`, SELECT, DELETE privileges on the Snowflake table. - USAGE privileges on the warehouse. -CREATE TABLE privileges on the schema. snowflake.account-name The name of your Snowflake account, formatted as <organization>-<name>. If you are using the Snowsight web interface, select Admin > Accounts. The organization is listed above the account name. If you are using the classic web interface, select the user profile in the dropdown to reveal the organization ID and account name. snowflake.warehouse The name of the Snowflake warehouse that you configured in the prerequisites. For append-streams, Decodable uses warehouses to create the sink tables during connection creation. For change-streams, warehouses are used to continuously merge data into the sink Snowflake tables. snowflake.merge-interval Change records are sent to the sink table in batches. Define the interval to wait before sending the change records into the table. For example, 1 minute, 5h, 100 sec, etc. Supported time unit labels are: - Days d, day - Hours: h, hour - Minutes: min, minute - Seconds: s, sec, second - Milliseconds: ms, milli, millisecond - Microseconds: µs, micro, microsecond - Nanoseconds: ns, nano, nanosecond Required for change streams, unsupported for append streams. Data types mapping The following table describes the mapping of Decodable data types to their Snowflake data type counterparts. Decodable Type Snowflake Type Char CHARACTER VarChar/String VARCHAR Boolean BOOLEAN Binary BINARY VarBinary, Bytes VARBINARY Decimal, Dec, Numeric NUMBER TinyInt TINYINT SmallInt SMALLINT Int INT BigInt BIGINT Float FLOAT Double DOUBLE Date DATE Time TIME Timestamp TIMESTAMP_NTZ Timestamp_ltz TIMESTAMP_LTZ Array ARRAY Map OBJECT Row Not supported MultiSet Not supported Interval Not supported Decodable CLI Release notes The pre-release Decodable CLI v1.9.0 adds two additional commands scan and apply that are used for creating MySQL or Snowflake connections. Scan: This step creates a YAML specification for a connection with all relevant information. The scan operation infers the available databases/tables in the external system for a source connection and the available streams in Decodable for a sink connection. Apply: This step applies the YAML, creating the specified streams in Decodable (for source connectors) or external resources (for sink connectors). It also creates the connection. Note that if targets (streams or external resources) already exist, the schema must match what’s written in the YAML file. Otherwise, the operation fails. Note that there are technical differences between source and sink scans: Scanning a source: A source scan actually scans the databases/tables available in the external system and infers the corresponding stream mappings. Scanning a sink: A sink scan merely scans the available streams in Decodable and infers the corresponding stream mappings to external tables. There is no interaction with external resources. Despite this difference, a sink scan command requires all connection properties to be specified (like username and password/key for the external system), to maintain symmetry with source scans. Scan options Required flags: connector: Specify the connector that you want to use. Either mysql-cdc or snowflake. type: Either source or sink. name: A name for the connection. prop: Required properties defined by the connector. See MySQL Property Names or Snowflake Property Names Optional flags description: A description for the connection. Source-only flags stream-prefix: A common prefix to add to all created stream names. stream-name-template: A template to use to create stream names. You can specify properties included in the source’s external resource specifier as template variables, enclosed in curly braces. For MySQL, the available specifiers are {database-name} and {table-name}. For example, the template my-stream-{database-name}-{table-name} would produce a stream name like my-stream-db01-customers for the table customers in the database db01. --prop database-name: If specified, restricts the scan to a single database instead of scanning all databases on the host. --opt with-metadata-fields=true: Include metadata fields _decodable_database, _decodable_table, and _decodable_op_ts`in the connection and created streams. Useful when unioning tables into a single stream, and when tracking when a given row was last updated. The `_decodable_database and _decodable_table metadata fields are added to the stream primary key as well. Note that _decodable_op_ts defaults to 1970-01-01 00:00:00.000 for all existing records in the upstream MySQL table, upon first starting the connector. All records inserted or updated following the initial connection activation will be populated with the UTC-based timestamp of the operation. --opt preserve-not-nulls=true: Include the type NOT NULL in the created stream. Sink-only flags stream-prefix: Filter Decodable streams by this prefix, creating stream mappings only for the resulting set of streams. Useful when stream-prefix or stream-name-template were used in the creation of source streams, allowing you to select only those streams. Apply options Optional flags --opt allow-evolution=true: Apply any new columns detected during a connection scan. Sink-only flags --opt preserve-case-for-resource-names=true: Preserve casing for resource names. --opt preserve-case-for-column-names=true: Preserve casing for column names. Notes about updating a MySQL or Snowflake connection using the Decodable CLI For both source (stream creation) and sink (external resource creation), there is no difference between the create and update flow. If the table or stream doesn’t exist yet, they’ll be created. If the table or stream does exist and the schema matches, the existing one is used. If the table or stream does exist but the schema doesn’t match, the command fails. For the apply command, the CLI automatically detects whether a connection with the specified name already exists and creates or updates the connection accordingly. Connection properties that aren’t included in the request aren’t touched (for example, description or specific props). Stream mappings are replaced with the list in the YAML. The apply command doesn’t do any merging itself.