Snowflake sink connector

Use the Snowflake Connector to send data from Decodable to Snowflake tables. It can send data from both append or change streams into a Snowflake table, without the need for third-party infrastructure.

Features

Delivery guarantee

  • Append streams: At least once

  • Change streams: Exactly once

Read from multiple streams

Yes

Although the Snowflake connection supports both append and change streams, a single Snowflake connection must be comprised of entirely all append streams or all change streams. If you want to send data from both change streams and append streams, then you must create a separate Snowflake connection.

Prerequisites: Prepare Snowflake for connections

Before you send data from Decodable into Snowflake, do the following in your Snowflake account to make sure that you are able to create a connection to it.

Generate a private and public key pair for authentication

Perform the following steps to generate a private and public key to secure communications between Decodable and Snowflake.

  1. Open a command line interface.

  2. Generate an unencrypted private key by running the following command.

    openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8 -nocrypt
    • The Snowflake connector expects an unencrypted private key. If you want to use an encrypted private key, omit the -nocrypt option from the above command. However, you will need to decrypt it before attaching it to the Snowflake connector. You can decrypt the private key with the following command:

      openssl rsa -in rsa_key.p8 -out decrypted_rsa_key.p8
  3. Generate a public key by running the following command. For this step, it doesn’t matter whether your private key was encrypted or not.

    openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
  4. Copy the public key that you generated in Step 3. Copy only the part of the key between the -----BEGIN PUBLIC KEY----- and -----END PUBLIC KEY----- delimiters. You can do this manually, or use the following command to print the required portion of the key:

    cat <KEY_FILE> | sed '1d;$d'
  5. (Optional) If you want to create a new user in Snowflake to use as a Decodable service account, run the following query. The placeholder in the query is defined as follows:

    <USER>: The name of the user that you want to create.

    use role useradmin;
    CREATE USER <USER>;
  6. Assign the public key to an existing user in Snowflake by running the following queries in a Snowflake worksheet. The placeholders in the query are defined as follows:

    • <USER>: The name of the user that you want to assign the public key to. If you performed step 5, this value will be the user that you created.

    • <PUBLIC_KEY>: The public key that you copied in Step 4. The -----BEGIN PUBLIC KEY----- and -----END PUBLIC KEY----- delimiters must be excluded.

      use role securityadmin;
      alter user "<USER>" set RSA_PUBLIC_KEY='<PUBLIC_KEY>';

Create, configure, and assign a role

In this step, you’ll create, configure, and assign a custom Snowflake role with sufficient privileges to your Snowflake user. This enables Decodable’s Snowflake connector to be able to interact with the Snowflake tables.

  1. Open a Snowflake worksheet.

  2. (Optional) If you want to create a new role in Snowflake to use as the Decodable service account role, run the following query. The placeholder in the query is defined as follows:

    • <ROLE>: The name of the role that you want to create.

    use role useradmin;
    create role <ROLE>;
  3. (Optional) If you want to create a new database and schema for Decodable to work with, run the following queries. The placeholders in the queries are defined as follows:

    • <DATABASE>: The name of the database that you want to create.

    • <SCHEMA>: The name of the schema that you want to create.

    use role sysadmin;
    create database <DATABASE>;
    create schema <DATABASE>.<SCHEMA>;
  4. (Optional) When you create a connection between Decodable and Snowflake, Decodable will create the sink tables in Snowflake for you. However, you can also create the sink tables in Snowflake ahead of time if you prefer. Run the following query to create the sink tables. The placeholders in the queries are defined as follows:

    • <DATABASE>: The name of the database containing the table that you want to send data to.

    • <SCHEMA>: The name of the schema containing the table that you want to send data to.

    • <TABLE>: The name of the existing tables that you want to send data to.

    • <COL_N> <TYPE_N>: The name and Snowflake data type of the Nth column, respectively.

      use role sysadmin;
      create table <DATABASE>.<SCHEMA>.<TABLE> (
        <COL_1> <TYPE_1>,
        <COL_2> <TYPE_2>,
        ...
      );
  5. Run the following queries to create, configure, and assign permissions. The placeholders in the queries are defined as follows:

    • <DATABASE>: The name of the database containing the tables that you want to send data to. If you chose to create a database in Step 3, then this will be the name of the new database.

    • <SCHEMA>: The name of the schema containing the tables that you want to send data to. If you chose to create a schema in Step 3, then this will be the name of the new schema.

    • <TABLE>: The name of the tables that you want to send data to. This will be the name of the tables created in Step 4.

    • <USER>: The name of the user that you assigned the public key to in the “Generate a private and public key pair for authentication” step.

    • <ROLE>: The name of the role that’s receiving operating permissions. If you chose to create a role in Step 2, then this will be the name of the new role.

    -- Switch to a role that can create and manage roles and privileges.
    use role securityadmin;
    
    -- Add privileges to the role.
    grant usage on database <DATABASE> to role <ROLE>;
    grant usage on schema <SCHEMA> to role <ROLE>;
    grant create schema on database <DATABASE> to role <ROLE>; // Only required if you are working with change streams
    grant create table on schema <SCHEMA> to role <ROLE>;
    grant select, insert, update, delete on table <TABLE> to role <ROLE>; // Only required if you are pre-creating the sink tables
    
    -- Grant the custom role to the user that you previously assigned the public key to.
    grant role <ROLE> to user <USER>;

Create and set up a Snowflake Warehouse

For append-streams, Decodable uses warehouses to create the sink tables during connection creation. For change-streams, warehouses are used to continuously merge data into the sink Snowflake tables. This may incur significant usage on your Snowflake warehouse, depending on your data volume and configured merge interval.

To send data from Decodable to Snowflake, you must either create a new warehouse in Snowflake or designate an existing warehouse to use that you have sufficient permissions for. We recommend that you set up a new warehouse in order to audit usage and Snowflake costs incurred by Decodable.

  1. Create a new warehouse in Snowflake. The placeholder in the query is defined as follows:

    • <WAREHOUSE>: The name of the warehouse that you want to create.

      use role sysadmin;
      CREATE WAREHOUSE <WAREHOUSE> WITH WAREHOUSE_SIZE='X-SMALL' INITIALLY_SUSPENDED=TRUE AUTO_RESUME=TRUE AUTO_SUSPEND=60;
  2. Assign permissions to a role by running the following queries. The placeholders in the queries are defined as follows:

    • <SCHEMA>: The name of the schema containing the table that you want to send data to.

    • <WAREHOUSE>: The name of the warehouse that will be used to merge change stream data into your sink table. If you chose to create a warehouse in Step 1, then this will be the name of that warehouse.

    • <TABLE>: The names of the existing tables that you want to send data to.

    • <ROLE>: The name of the role that’s receiving operating permissions.

    use role securityadmin;
    
    grant usage on warehouse <WAREHOUSE> to role <ROLE>;

Steps

If you want to use the Decodable CLI or API to create the connection, you can refer to the Property Name column for information about what the underlying property names are. The connector name is snowflake.
  1. From the Connections page, select the Snowflake connector and complete the following fields.

    UI Field

    Property Name

    Description

    Database

    snowflake.database

    The name of the database containing the tables that you want to send data to.

    Schema

    snowflake.schema

    The name of the schema containing the tables that you want to send data to.

    User

    snowflake.user

    The name of the user sending data to the Snowflake table. The user must be granted usage permissions on the provided role.

    Private Key

    snowflake.private-key

    The secret associated with the private key that you generated in the prerequisites. When you are creating the secret containing your private key, make sure that the key is unencrypted and that the -----BEGIN PRIVATE KEY----- and -----END PRIVATE KEY----- header and footers are excluded. See Generate a private and public key pair for authentication.

    If you are using the Decodable CLI, this is the ID of the secret resource corresponding to the private key. Run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret.

    Role

    snowflake.role

    The name of the role to use for all Snowflake actions. The role must be granted the following privileges.

    If you are sending data from an append stream:

    • INSERT privileges on the Snowflake table.

    • USAGE privileges on the schema and database.

    If you are sending data from change streams, you must additionally have the following privileges:

    • INSERT, UPDATE, SELECT, DELETE privileges on the Snowflake table.

    • USAGE privileges on the warehouse.

    -CREATE TABLE privileges on the schema.

    Note: Any table-level permissions that are required are automatically applied if Decodable creates the sink tables for you.

    Account Name

    snowflake.account-name

    The name of your Snowflake account, formatted as <organization>-<name>.

    If you are using the Snowsight web interface, select Admin > Accounts. The organization is listed above the account name. If you are using the classic web interface, select the user profile in the dropdown to reveal the organization ID and account name.

    Change Stream Materialization Interval

    snowflake.merge-interval

    Only applicable if you are using a change stream.

    Change records are sent to the sink table in batches. Define the interval to wait before sending the change records into the table.

    For example, 1 minute, 5h, 100 sec.

    Table 1. Supported time unit labels
    Unit Accepted labels

    Days

    d, day

    Hours

    h, hour

    Minutes

    min, minute

    Seconds

    s, sec, second

    Milliseconds

    ms, milli, millisecond

    Microseconds

    µs, micro, microsecond

    Nanoseconds

    ns, nano, nanosecond

    For further explanation of this setting see Change Stream Materialization Interval.

    Warehouse

    snowflake.warehouse

    The name of the Snowflake warehouse used to perform table creation, and data merging for change streams.

  2. Select Next. The next screen shows a list of streams that Decodable can send to Snowflake.

  3. Select the streams containing the data that you’d like to send to Snowflake. Decodable automatically creates a Snowflake table for each stream selected, and performs schema mapping between the Decodable stream and the Snowflake table. See Data types mapping for how Decodable types map to Snowflake types. Then, select Next.

  4. (Optional) Give the sink tables a name. By default, Decodable uses the same name for the sink table as the stream. For example, if you are sending data from a stream called users, then the sink Snowflake table is given the name users.

    1. If you want to rename the generated table, override the generated name.

    2. If the sink table already exists in Snowflake and it has the same schema as the incoming stream, then Decodable sends data to the existing sink table.

  5. When you are ready to continue, select Next. At this point, Decodable creates the sink tables in Snowflake.

  6. Give the newly created connection a Name and Description and select Save.

Be aware that even if you decide to not save this connection, the sink tables created by Decodable in Step 4 still exist. If you don’t want these tables, you should delete them in Snowflake.

You can now use this connection to send a stream of records to a given Snowflake table without the need for additional infrastructure such as Snowflake merge tasks or staging data in S3 first. When processing CDC data, data is first written to a staging table in Snowflake. The connector will automatically merge these changes into the target table at the merge interval you specify. When ingesting append- or insert-only data, a staging table isn’t needed. Decodable will directly ingest into the target table.

Working with change data capture records and Snowflake Warehouse notes

When you configure Decodable to send change data capture records to Snowflake, Decodable automatically creates and manages a Snowflake staging table, for each sink table, where changes are first loaded. Then, depending on the merge interval setting, changes are merged from the staging table into the specified sink table. The staging table is named <decodable-account-id>_<connection-id>_<sink-schema>__<sink-table>. These staging tables are all stored in a single shared schema named DECODABLE_STAGING.

How does restarting or re-creating the Snowflake connection affect read consistency?

Eventually-consistent reads: Decodable doesn’t automatically clean up or delete staging tables when you force-restart or recreate the Snowflake Connector. Either of these actions may result in eventually-consistent reads while the restarted or re-created Snowflake connection works to catch back up.

If you can’t tolerate eventually-consistent reads, then you should manually clear the staging table before force-restarting or re-creating the connection.

Change Stream materialization interval

This setting is only applicable if you are using a change stream.

Decodable continuously streams new records in real time to staging tables in Snowflake, and periodically merges the contents of the staging tables to their respective sink tables. The materialization interval is what controls the frequency of the merge. The materialization interval indicates a lower bound for how frequently the merging process will run. It’s a lower bound because merges can’t happen more frequently than the underlying Flink job checkpointing (by default, every 10 seconds).

When Decodable streams records to the staging table it doesn’t invoke the warehouse at all, and is metered primarily by data volume.

The merge process takes the existing records in the staging tables, merges them into the sink tables using the warehouse (thereby waking it up), and deletes those merged records from the staging table. Once the merge is complete, the warehouse won’t be used, and can sleep, until the next merge process kicks off. The merge process should only wake up the warehouse if there are records to be merged—​it will no-op if no new records have been encountered since the last merge, and will check again for new records on the next scheduled merge attempt.

You should set the interval based on your requirements and the following considerations:

  1. The shorter the interval, the greater the use of your Snowflake warehouse and associated costs.

  2. The longer the interval, the greater the latency of data arriving in the sink table

  3. The interval should be longer than the time that it takes for the merge to occur.

    • If a merge takes longer than the materialization interval then the next merge will run immediately, and thus merges can pile up and be observed to be running continuously. You can alleviate this issue (and the associated additional warehouse costs in Snowflake) by setting a larger value for the materialization interval.

Connector starting state and offsets

A new sink connection will start reading from the Latest point in the source Decodable stream. This means that only data that’s written to the stream when the connection has started will be sent to the external system. You can override this when you start the connection to Earliest if you want to send all the existing data on the source stream to the target system, along with all new data that arrives on the stream.

When you restart a sink connection it will continue to read data from the point it most recently stored in the checkpoint before the connection stopped. You can also opt to discard the connection’s state and restart it afresh from Earliest or Latest as described above.

Learn more about starting state here.

Data types mapping

The following table describes the mapping of Decodable data types to their Snowflake data type counterparts.

Decodable Type Snowflake Type

CHAR

CHARACTER

VARCHAR/STRING

VARCHAR

BOOLEAN

BOOLEAN

BINARY

BINARY

VARBINARY, BYTES

VARBINARY

DECIMAL, DEC, NUMERIC

NUMBER

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DATE

DATE

TIME

TIME

TIMESTAMP

TIMESTAMP_NTZ

TIMESTAMP_LTZ

TIMESTAMP_LTZ

ARRAY

ARRAY

MAP

OBJECT

ROW

VARIANT

MULTISET

Not supported

INTERVAL

Not supported

Resource specifier keys

When using the connection scan command of the Decodable CLI to create a connection specification, the following resource specifier keys are available:

Name Description

snowflake.table

The table name