Snowflake sink connector

Features

Connector name

snowflake

Supported task sizes

S, M, L

A single instance of this connector can write to multiple tables in a single schema and database

Supported stream types

Although the Snowflake connection supports both types of stream, a single Snowflake connection may only use one type of stream. If you want to send data from both change streams and append streams, then you must create separate Snowflake connections.

Configuration properties

Property Description Required Default

snowflake.database

The name of the database containing the tables that you want to send data to.

Yes

snowflake.schema

The name of the schema containing the tables that you want to send data to.

Yes

snowflake.user

The name of the user sending data to the Snowflake table.

The user must be granted usage permissions on the provided role.

Yes

snowflake.private-key

The private key that you generated in the prerequisites.

This must be provided as a secret resource.

Make sure that when you create the Decodable secret you do the following:

  • The key must be unencrypted

  • Do not include the -----BEGIN PRIVATE KEY----- and -----END PRIVATE KEY----- header and footers

Yes

snowflake.role

The name of the role to use for all Snowflake actions.

Ensure that the role has the necessary permissions.

Yes

snowflake.account-name

The name of your Snowflake account.

This must be formatted as <organization>-<name>.

If you are using the Snowsight web interface, select Admin > Accounts. The organization is listed above the account name.

If you are using the classic web interface, select the user profile in the dropdown to reveal the organization ID and account name.

Yes

snowflake.warehouse

The name of the Snowflake warehouse used to perform table creation, and data merging for change streams.

Yes

snowflake.merge-interval

Change records are sent to the sink table in batches. Define the interval to wait before sending the change records into the table.

For example, 1 minute, 5h, 100 sec.

Table 1. Supported time unit labels
Unit Accepted labels

Days

d, day

Hours

h, hour

Minutes

min, minute

Seconds

s, sec, second

Milliseconds

ms, milli, millisecond

Microseconds

µs, micro, microsecond

Nanoseconds

ns, nano, nanosecond

For further explanation of this setting see Change Stream Materialization Interval.

Only applicable if you are using a change stream

Prerequisites

Generate a private and public key pair for authentication

Perform the following steps to generate a private and public key to secure communications between Decodable and Snowflake.

  1. Open a command line interface.

  2. Generate an unencrypted private key by running the following command.

    openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8 -nocrypt
    • The Snowflake connector expects an unencrypted private key. If you want to use an encrypted private key, omit the -nocrypt option from the above command. However, you will need to decrypt it before attaching it to the Snowflake connector. You can decrypt the private key with the following command:

      openssl rsa -in rsa_key.p8 -out decrypted_rsa_key.p8
  3. Generate a public key by running the following command. For this step, it doesn’t matter whether your private key was encrypted or not.

    openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
  4. Copy the public key that you generated in the previous step.

    Copy only the part of the key between the -----BEGIN PUBLIC KEY----- and -----END PUBLIC KEY----- delimiters.

    You can do this manually, or use the following command to print the required portion of the key:

    cat <KEY_FILE> | sed '1d;$d'
  5. (Optional) If you want to create a new user in Snowflake to use as a Decodable service account, run the following query. The placeholder in the query is defined as follows:

    <USER>: The name of the user that you want to create.

    use role useradmin;
    CREATE USER <USER>;
  6. Assign the public key to an existing user in Snowflake by running the following queries in a Snowflake worksheet. The placeholders in the query are defined as follows:

    • <USER>: The name of the user that you want to assign the public key to. If you performed step 5, this value will be the user that you created.

    • <PUBLIC_KEY>: The public key that you copied in Step 4. The -----BEGIN PUBLIC KEY----- and -----END PUBLIC KEY----- delimiters must be excluded.

      use role securityadmin;
      alter user "<USER>" set RSA_PUBLIC_KEY='<PUBLIC_KEY>';

Create, configure, and assign a role

In this step, you’ll create, configure, and assign a custom Snowflake role with sufficient privileges to your Snowflake user. This enables Decodable’s Snowflake connector to be able to interact with the Snowflake tables.

  1. Open a Snowflake worksheet.

  2. (Optional) If you want to create a new role in Snowflake to use as the Decodable service account role, run the following query. The placeholder in the query is defined as follows:

    • <ROLE>: The name of the role that you want to create.

    use role useradmin;
    create role <ROLE>;
  3. (Optional) If you want to create a new database and schema for Decodable to work with, run the following queries. The placeholders in the queries are defined as follows:

    • <DATABASE>: The name of the database that you want to create.

    • <SCHEMA>: The name of the schema that you want to create.

    use role sysadmin;
    create database <DATABASE>;
    create schema <DATABASE>.<SCHEMA>;
  4. (Optional) When you create a connection between Decodable and Snowflake, Decodable will create the sink tables in Snowflake for you. However, you can also create the sink tables in Snowflake ahead of time if you prefer. Run the following query to create the sink tables. The placeholders in the queries are defined as follows:

    • <DATABASE>: The name of the database containing the table that you want to send data to.

    • <SCHEMA>: The name of the schema containing the table that you want to send data to.

    • <TABLE>: The name of the existing tables that you want to send data to.

    • <COL_N> <TYPE_N>: The name and Snowflake data type of the Nth column, respectively.

      use role sysadmin;
      create table <DATABASE>.<SCHEMA>.<TABLE> (
        <COL_1> <TYPE_1>,
        <COL_2> <TYPE_2>,
        ...
      );
  5. Run the following queries to create, configure, and assign permissions. The placeholders in the queries are defined as follows:

    • <DATABASE>: The name of the database containing the tables that you want to send data to. If you chose to create a database in Step 3, then this will be the name of the new database.

    • <SCHEMA>: The name of the schema containing the tables that you want to send data to. If you chose to create a schema in Step 3, then this will be the name of the new schema.

    • <TABLE>: The name of the tables that you want to send data to. This will be the name of the tables created in Step 4.

    • <USER>: The name of the user that you assigned the public key to in the “Generate a private and public key pair for authentication” step.

    • <ROLE>: The name of the role that’s receiving operating permissions. If you chose to create a role in Step 2, then this will be the name of the new role.

    -- Switch to a role that can create and manage roles and privileges.
    use role securityadmin;
    
    -- Add privileges to the role.
    grant usage on database <DATABASE> to role <ROLE>;
    grant usage on schema <SCHEMA> to role <ROLE>;
    grant create schema on database <DATABASE> to role <ROLE>; // Only required if you are working with change streams
    grant create table on schema <SCHEMA> to role <ROLE>;
    grant select, insert, update, delete on table <TABLE> to role <ROLE>; // Only required if you are pre-creating the sink tables
    
    -- Grant the custom role to the user that you previously assigned the public key to.
    grant role <ROLE> to user <USER>;

Role permissions summary

If you are sending data from an append stream you will require these privileges

  • INSERT privileges on the Snowflake table.

  • USAGE privileges on the schema and database.

If you are sending data from change streams, you must additionally have the following privileges

  • INSERT, UPDATE, SELECT, DELETE privileges on the Snowflake table.

  • USAGE privileges on the warehouse.

  • CREATE TABLE privileges on the schema.

Any table-level permissions that are required are automatically applied if Decodable creates the sink tables for you.

Create and set up a Snowflake Warehouse

For append-streams, Decodable uses warehouses to create the sink tables during connection creation. For change-streams, warehouses are used to continuously merge data into the sink Snowflake tables. This may incur significant usage on your Snowflake warehouse, depending on your data volume and configured merge interval.

To send data from Decodable to Snowflake, you must either create a new warehouse in Snowflake or designate an existing warehouse to use that you have sufficient permissions for. We recommend that you set up a new warehouse in order to audit usage and Snowflake costs incurred by Decodable.

  1. Create a new warehouse in Snowflake. The placeholder in the query is defined as follows:

    • <WAREHOUSE>: The name of the warehouse that you want to create.

      use role sysadmin;
      CREATE WAREHOUSE <WAREHOUSE> WITH WAREHOUSE_SIZE='X-SMALL' INITIALLY_SUSPENDED=TRUE AUTO_RESUME=TRUE AUTO_SUSPEND=60;
  2. Assign permissions to a role by running the following queries. The placeholders in the queries are defined as follows:

    • <SCHEMA>: The name of the schema containing the table that you want to send data to.

    • <WAREHOUSE>: The name of the warehouse that will be used to merge change stream data into your sink table. If you chose to create a warehouse in Step 1, then this will be the name of that warehouse.

    • <TABLE>: The names of the existing tables that you want to send data to.

    • <ROLE>: The name of the role that’s receiving operating permissions.

    use role securityadmin;
    
    grant usage on warehouse <WAREHOUSE> to role <ROLE>;

Table names

By default, Decodable uses the stream name as the name of the table it writes to. If a table already exists with that name and the schema of the stream matches the schema of the table, Decodable will write to the existing table. If it doesn’t exist, Decodable will create it.

You can change the name of the table to which Decodable writes either in the web interface, or by using output-resource-name-template when calling decodable connection scan.

The schema of each stream is automatically translated to Snowflake, including:

  • field names

  • data types (See data types for how Decodable types map to Snowflake types)

  • primary keys

Writing data to multiple tables

A single instance of this connector can write to multiple tables in a single schema and database

If you are using the CLI to create or edit a connection with this connector, you must use the declarative approach. You can generate the connection definition for the tables that you want to write to decodable connection scan.

Resource specifier keys

When using the decodable connection scan command of the Decodable CLI to create a connection specification, the following resource specifier keys are available:

Name Description

snowflake.table

The table name

CDC and Snowflake warehouse

When you configure Decodable to send change data capture records to Snowflake, Decodable automatically creates and manages a Snowflake staging table, for each sink table, where changes are first loaded. Then, depending on the merge interval setting, changes are merged from the staging table into the specified sink table. The staging table is named <decodable-account-id>_<connection-id>_<sink-schema>__<sink-table>. These staging tables are all stored in a single shared schema named DECODABLE_STAGING.

How does restarting or re-creating the Snowflake connection affect read consistency?

Eventually-consistent reads: Decodable doesn’t automatically clean up or delete staging tables when you force-restart or recreate the Snowflake Connector. Either of these actions may result in eventually-consistent reads while the restarted or re-created Snowflake connection works to catch back up.

If you can’t tolerate eventually-consistent reads, then you should manually clear the staging table before force-restarting or re-creating the connection.

Change Stream materialization interval

This setting is only applicable if you are using a change stream.

Decodable continuously streams new records in real time to staging tables in Snowflake, and periodically merges the contents of the staging tables to their respective sink tables. The materialization interval is what controls the frequency of the merge. The materialization interval indicates a lower bound for how frequently the merging process will run. It’s a lower bound because merges can’t happen more frequently than the underlying Flink job checkpointing (by default, every 10 seconds).

When Decodable streams records to the staging table it doesn’t invoke the warehouse at all, and is metered primarily by data volume.

The merge process takes the existing records in the staging tables, merges them into the sink tables using the warehouse (thereby waking it up), and deletes those merged records from the staging table. Once the merge is complete, the warehouse won’t be used, and can sleep, until the next merge process kicks off. The merge process should only wake up the warehouse if there are records to be merged—​it will no-op if no new records have been encountered since the last merge, and will check again for new records on the next scheduled merge attempt.

You should set the interval based on your requirements and the following considerations:

  1. The shorter the interval, the greater the use of your Snowflake warehouse and associated costs.

  2. The longer the interval, the greater the latency of data arriving in the sink table

  3. The interval should be longer than the time that it takes for the merge to occur.

    • If a merge takes longer than the materialization interval then the next merge will run immediately, and thus merges can pile up and be observed to be running continuously. You can alleviate this issue (and the associated additional warehouse costs in Snowflake) by setting a larger value for the materialization interval.

Connector starting state and offsets

A new sink connection will start reading from the Latest point in the source Decodable stream. This means that only data that’s written to the stream when the connection has started will be sent to the external system. You can override this when you start the connection to Earliest if you want to send all the existing data on the source stream to the target system, along with all new data that arrives on the stream.

When you restart a sink connection it will continue to read data from the point it most recently stored in the checkpoint before the connection stopped. You can also opt to discard the connection’s state and restart it afresh from Earliest or Latest as described above.

Learn more about starting state here.

Data types mapping

The following table describes the mapping of Decodable data types to their Snowflake data type counterparts.

Decodable Type Snowflake Type

CHAR

CHARACTER

VARCHAR/STRING

VARCHAR

BOOLEAN

BOOLEAN

BINARY

BINARY

VARBINARY, BYTES

VARBINARY

DECIMAL, DEC, NUMERIC

NUMBER

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DATE

DATE

TIME

TIME

TIMESTAMP

TIMESTAMP_NTZ

TIMESTAMP_LTZ

TIMESTAMP_LTZ

ARRAY

ARRAY

MAP

OBJECT

ROW

VARIANT

MULTISET

Not supported

INTERVAL

Not supported