Snowflake sink connector Features Connector name snowflake Delivery guarantee At least once (when reading from an Append stream) Exactly once (when reading from a Change stream) Supported task sizes S, M, L Multiplex capability A single instance of this connector can write to multiple tables in a single schema and database Supported stream types Append stream Change stream Although the Snowflake connection supports both types of stream, a single Snowflake connection may only use one type of stream. If you want to send data from both change streams and append streams, then you must create separate Snowflake connections. Configuration properties Property Description Required Default snowflake.database The name of the database containing the tables that you want to send data to. Yes snowflake.schema The name of the schema containing the tables that you want to send data to. Yes snowflake.user The name of the user sending data to the Snowflake table. The user must be granted usage permissions on the provided role. Yes snowflake.private-key The private key that you generated in the prerequisites. This must be provided as a secret resource. Make sure that when you create the Decodable secret you do the following: The key must be unencrypted Do not include the -----BEGIN PRIVATE KEY----- and -----END PRIVATE KEY----- header and footers Yes snowflake.role The name of the role to use for all Snowflake actions. Ensure that the role has the necessary permissions. Yes snowflake.account-name The name of your Snowflake account. This must be formatted as <organization>-<account_name>. The hyphen between the two is important. See below for more information. Yes snowflake.warehouse The name of the Snowflake warehouse used to perform table creation, and data merging for change streams. For more details see Create and set up a Snowflake Warehouse Yes snowflake.merge-interval Change records are sent to the sink table in batches. Define the interval to wait before sending the change records into the table. For example, 1 minute, 5h, 100 sec. Table 1. Supported time unit labels Unit Accepted labels Days d, day Hours h, hour Minutes min, minute Seconds s, sec, second Milliseconds ms, milli, millisecond Microseconds µs, micro, microsecond Nanoseconds ns, nano, nanosecond For further explanation of this setting see Change Stream Materialization Interval. Required if you are using a change stream Not applicable if you are using an append stream Prerequisites Generate a private and public key pair for authentication Perform the following steps to generate a private and public key to secure communications between Decodable and Snowflake. Open a command line interface. Generate an unencrypted private key by running the following command. openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8 -nocrypt The Snowflake connector expects an unencrypted private key. If you want to use an encrypted private key, omit the -nocrypt option from the above command. However, you will need to decrypt it before attaching it to the Snowflake connector. You can decrypt the private key with the following command: openssl rsa -in rsa_key.p8 -out decrypted_rsa_key.p8 Generate a public key by running the following command. For this step, it doesn’t matter whether your private key was encrypted or not. openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub Copy the public key that you generated in the previous step. Copy only the part of the key between the -----BEGIN PUBLIC KEY----- and -----END PUBLIC KEY----- delimiters. You can do this manually, or use the following command to print the required portion of the key: cat <KEY_FILE> | sed '1d;$d' (Optional) If you want to create a new user in Snowflake to use as a Decodable service account, run the following query. The placeholder in the query is defined as follows: <USER>: The name of the user that you want to create. use role useradmin; CREATE USER <USER>; Assign the public key to an existing user in Snowflake by running the following queries in a Snowflake worksheet. The placeholders in the query are defined as follows: <USER>: The name of the user that you want to assign the public key to. If you performed step 5, this value will be the user that you created. <PUBLIC_KEY>: The public key that you copied in Step 4. The -----BEGIN PUBLIC KEY----- and -----END PUBLIC KEY----- delimiters must be excluded. use role securityadmin; alter user "<USER>" set RSA_PUBLIC_KEY='<PUBLIC_KEY>'; Create, configure, and assign a role In this step, you’ll create, configure, and assign a custom Snowflake role with sufficient privileges to your Snowflake user. This enables Decodable’s Snowflake connector to be able to interact with the Snowflake tables. Open a Snowflake worksheet. (Optional) If you want to create a new role in Snowflake to use as the Decodable service account role, run the following query. The placeholder in the query is defined as follows: <ROLE>: The name of the role that you want to create. use role useradmin; create role <ROLE>; (Optional) If you want to create a new database and schema for Decodable to work with, run the following queries. The placeholders in the queries are defined as follows: <DATABASE>: The name of the database that you want to create. <SCHEMA>: The name of the schema that you want to create. use role sysadmin; create database <DATABASE>; create schema <DATABASE>.<SCHEMA>; (Optional) When you create a connection between Decodable and Snowflake, Decodable will create the sink tables in Snowflake for you. However, you can also create the sink tables in Snowflake ahead of time if you prefer. Run the following query to create the sink tables. The placeholders in the queries are defined as follows: <DATABASE>: The name of the database containing the table that you want to send data to. <SCHEMA>: The name of the schema containing the table that you want to send data to. <TABLE>: The name of the existing tables that you want to send data to. <COL_N> <TYPE_N>: The name and Snowflake data type of the Nth column, respectively. use role sysadmin; create table <DATABASE>.<SCHEMA>.<TABLE> ( <COL_1> <TYPE_1>, <COL_2> <TYPE_2>, ... ); Run the following queries to create, configure, and assign permissions. The placeholders in the queries are defined as follows: <DATABASE>: The name of the database containing the tables that you want to send data to. If you chose to create a database in Step 3, then this will be the name of the new database. <SCHEMA>: The name of the schema containing the tables that you want to send data to. If you chose to create a schema in Step 3, then this will be the name of the new schema. <TABLE>: The name of the tables that you want to send data to. This will be the name of the tables created in Step 4. <USER>: The name of the user that you assigned the public key to in the “Generate a private and public key pair for authentication” step. <ROLE>: The name of the role that’s receiving operating permissions. If you chose to create a role in Step 2, then this will be the name of the new role. -- Switch to a role that can create and manage roles and privileges. use role securityadmin; -- Add privileges to the role. grant usage on database <DATABASE> to role <ROLE>; grant usage on schema <SCHEMA> to role <ROLE>; grant create schema on database <DATABASE> to role <ROLE>; // Only required if you are working with change streams grant create table on schema <SCHEMA> to role <ROLE>; grant select, insert, update, delete on table <TABLE> to role <ROLE>; // Only required if you are pre-creating the sink tables -- Grant the custom role to the user that you previously assigned the public key to. grant role <ROLE> to user <USER>; Role permissions summary If you are sending data from an append stream you will require these privileges INSERT privileges on the Snowflake table. USAGE privileges on the schema and database. If you are sending data from change streams, you must additionally have the following privileges INSERT, UPDATE, SELECT, DELETE privileges on the Snowflake table. USAGE privileges on the warehouse. CREATE TABLE privileges on the schema. Any table-level permissions that are required are automatically applied if Decodable creates the sink tables for you. Create and set up a Snowflake Warehouse For append-streams, Decodable uses warehouses to create the sink tables during connection creation. For change-streams, warehouses are used to continuously merge data into the sink Snowflake tables. This may incur significant usage on your Snowflake warehouse, depending on your data volume and configured merge interval. To send data from Decodable to Snowflake, you must either create a new warehouse in Snowflake or designate an existing warehouse to use that you have sufficient permissions for. We recommend that you set up a new warehouse in order to audit usage and Snowflake costs incurred by Decodable. Create a new warehouse in Snowflake. The placeholder in the query is defined as follows: <WAREHOUSE>: The name of the warehouse that you want to create. use role sysadmin; CREATE WAREHOUSE <WAREHOUSE> WITH WAREHOUSE_SIZE='X-SMALL' INITIALLY_SUSPENDED=TRUE AUTO_RESUME=TRUE AUTO_SUSPEND=60; Assign permissions to a role by running the following queries. The placeholders in the queries are defined as follows: <SCHEMA>: The name of the schema containing the table that you want to send data to. <WAREHOUSE>: The name of the warehouse that will be used to merge change stream data into your sink table. If you chose to create a warehouse in Step 1, then this will be the name of that warehouse. <TABLE>: The names of the existing tables that you want to send data to. <ROLE>: The name of the role that’s receiving operating permissions. use role securityadmin; grant usage on warehouse <WAREHOUSE> to role <ROLE>; Table names By default, Decodable uses the stream name as the name of the table it writes to. If a table already exists with that name and the schema of the stream matches the schema of the table, Decodable will write to the existing table. If it doesn’t exist, Decodable will create it. You can change the name of the table to which Decodable writes either in the web interface, or by using output-resource-name-template when calling decodable connection scan. The schema of each stream is automatically translated to Snowflake, including: field names data types (See data types for how Decodable types map to Snowflake types) primary keys Writing data to multiple tables A single instance of this connector can write to multiple tables in a single schema and database If you are using the CLI to create or edit a connection with this connector, you must use the declarative approach. You can generate the connection definition for the tables that you want to write to decodable connection scan. Resource specifier keys When using the decodable connection scan command of the Decodable CLI to create a connection specification, the following resource specifier keys are available: Name Description snowflake.table The table name CDC and Snowflake warehouse When you configure Decodable to send change data capture records to Snowflake, Decodable automatically creates and manages a Snowflake staging table, for each sink table, where changes are first loaded. Then, depending on the merge interval setting, changes are merged from the staging table into the specified sink table. The staging table is named <decodable-account-id>_<connection-id>_<sink-schema>__<sink-table>. These staging tables are all stored in a single shared schema named DECODABLE_STAGING. How does restarting or re-creating the Snowflake connection affect read consistency? Eventually-consistent reads: Decodable doesn’t automatically clean up or delete staging tables when you force-restart or recreate the Snowflake Connector. Either of these actions may result in eventually-consistent reads while the restarted or re-created Snowflake connection works to catch back up. If you can’t tolerate eventually-consistent reads, then you should manually clear the staging table before force-restarting or re-creating the connection. Change Stream materialization interval This setting is only applicable if you are using a change stream. Decodable continuously streams new records in real time to staging tables in Snowflake, and periodically merges the contents of the staging tables to their respective sink tables. The materialization interval is what controls the frequency of the merge. The materialization interval indicates a lower bound for how frequently the merging process will run. It’s a lower bound because merges can’t happen more frequently than the underlying Flink job checkpointing (by default, every 10 seconds). When Decodable streams records to the staging table it doesn’t invoke the warehouse at all, and is metered primarily by data volume. The merge process takes the existing records in the staging tables, merges them into the sink tables using the warehouse (thereby waking it up), and deletes those merged records from the staging table. Once the merge is complete, the warehouse won’t be used, and can sleep, until the next merge process kicks off. The merge process should only wake up the warehouse if there are records to be merged—it will no-op if no new records have been encountered since the last merge, and will check again for new records on the next scheduled merge attempt. You should set the interval based on your requirements and the following considerations: The shorter the interval, the greater the use of your Snowflake warehouse and associated costs. The longer the interval, the greater the latency of data arriving in the sink table The interval should be longer than the time that it takes for the merge to occur. If a merge takes longer than the materialization interval then the next merge will run immediately, and thus merges can pile up and be observed to be running continuously. You can alleviate this issue (and the associated additional warehouse costs in Snowflake) by setting a larger value for the materialization interval. Specifying your Snowflake account name When configuring this connector you must provide your Snowflake account name in the specific format <organization>-<account_name>. If you don’t know the value for these you can find them in the web interface of Snowflake: Consult the Snowflake documentation to learn more about account identifiers. Connector starting state and offsets A new sink connection will start reading from the Latest point in the source Decodable stream. This means that only data that’s written to the stream when the connection has started will be sent to the external system. You can override this when you start the connection to Earliest if you want to send all the existing data on the source stream to the target system, along with all new data that arrives on the stream. When you restart a sink connection it will continue to read data from the point it most recently stored in the checkpoint before the connection stopped. You can also opt to discard the connection’s state and restart it afresh from Earliest or Latest as described above. Learn more about starting state here. Data types mapping The following table describes the mapping of Decodable data types to their Snowflake data type counterparts. Decodable Type Snowflake Type CHAR CHARACTER VARCHAR/STRING VARCHAR BOOLEAN BOOLEAN BINARY BINARY VARBINARY, BYTES VARBINARY DECIMAL, DEC, NUMERIC NUMBER TINYINT TINYINT SMALLINT SMALLINT INT INT BIGINT BIGINT FLOAT FLOAT DOUBLE DOUBLE DATE DATE TIME TIME TIMESTAMP TIMESTAMP_NTZ TIMESTAMP_LTZ TIMESTAMP_LTZ ARRAY ARRAY MAP OBJECT ROW VARIANT MULTISET Not supported INTERVAL Not supported