Ingest data into your data warehouse, data lake, or OLAP database

This guide walks you through ingesting time series and event-oriented data from an event streaming platform into your data warehouse, data lake, or OLAP database for analytics. By "event-oriented data," we mean product orders, market trades, HTTP requests and response events, or other kinds of application events.

This guide uses Apache Kafka as the data source and Snowflake as the data sink, but you can substitute these for your platforms of choice. For example, you can use a different event streaming platform such as Redpanda, Apache Pulsar, AWS Kinesis, Confluent Cloud, or others. Similarly, other common sinks are AWS S3, DeltaLake, ClickHouse, Apache Pinot, and others.

You can find a full list of available connectors here, or in the navigation menu on the left.

Let’s go! 🚀

Prerequisites

Before you get started, here are a few pieces of information you’ll need to have handy.

  • Your Kafka broker connection information. This includes the bootstrap brokers, authentication method supported by your deployment, and the authentication credentials required to connect. Additionally, you’ll need the topic names from which you want to ingest data.

  • Your Snowflake account name, warehouse, role, and authentication credentials. You’ll also need the database, schema, and table names to which you want to write data.

In addition, make sure that you have allowed connections from Decodable’s IPs. Otherwise, Decodable will be unable to connect to your brokers.

Guide

Step 1: Create and start a source connection

Go to Connections and select New Connection. Connections allow data to flow from data sources or sinks into a Decodable stream.

  1. Configure the connection

    Select Apache Kafka (or your system of choice). Then, choose Source as the connection type and provide the required connection information such as the list of bootstrap brokers, authentication method, credentials, and any other optional connection parameters.

    The Value Format parameter controls how Decodable deserializes data from the Kafka topic. Make sure to select the format that matches your data.

    If Decodable encounters data in a format other than what’s configured, it will follow the Parse Error Policy method you select. Selecting Fail causes Decodable to fail the connection and stop processing until the issue is corrected, while Ignore causes the system to skip and discard the malformed record.

    By default, Kafka connections operate in exactly once delivery mode in Decodable. For more information about the other Apache Kafka options, see Apache Kafka.

    Select Next.

  2. Choose or create a stream

    Streams transport records within Decodable. You can select a preexisting stream to which data should be written, or you can create a new stream.

    If you’re creating a new stream, give it a name. Names must be unique within an account and can contain letters, numbers, and underscores ([a-z0-9\_]+). This is how you’ll refer to your data in SQL queries and custom pipelines written in Java. Best practices are to give streams a description that helps you and your team remember what data it contains and how it should be used.

    Next, you must define the schema of the stream. The stream’s schema must match the schema of the data in the source Kafka topic. This is how Decodable validates and enables processing of data. The schema can be as simple as having a single text field or can fully describe a complex nested record. All records in a stream must have the same schema. If you already have a schema defined in JSON Schema or Apache Avro format, you can upload these to Decodable.

    To learn more about streams, see About Streams for an overview on the types of streams available, About Change Data Capture and Change Streams for information about change streams, and Create a Stream for information about creating streams, including how to specify event time and watermark information as well as how to use computed or metadata fields.

    Note that some connectors have enhanced schema detection support. For example, the Confluent Cloud connector supports integration with the Confluent Schema Registry.

    Make sure you choose the connector that best matches your infrastructure.

  3. Name your connection

    Give your connection a meaningful name and description for future reference.

    Selecting Create Connection performs a connectivity test. If Decodable successfully connects to your Kafka brokers, a connection is created and a Connection Details page opens.

    If Decodable is unable to validate the connection, an error is shown with more information. The most common reasons for errors are:

    • Network connectivity - Decodable is unable to connect to your brokers because a firewall or security group prevents access. This often results in a timeout while trying to connect. Make sure you’ve allowed connections from Decodable’s IPs. Double check the broker host names and ports and make sure they’re correct.

    • Authentication failures - Make sure you select the correct authentication mechanism (for example mTLS, SASL SCRAM-SHA-512) and that your credentials are correct. Keep in mind that Kafka also supports ACLs on the broker, so if you’ve enabled Kafka authorization controls, make sure the credentials you’re using are permitted to access the topic you’ve specified.

  4. Start your new connection

    Once your connection is created, you can start it by selecting Start. You choose the maximum task size and the maximum number of tasks when activating a connector or pipeline. A task is a compute unit that includes CPU, memory, network bandwidth, and local storage. Most connectors and all pipelines support scale-out parallel execution. The number of tasks controls the maximum parallelism of a connection or pipeline. A medium task is sufficient for most connections and should provide between 1000 and 30,000 records per second depending on record size, serialization format, and similar factors.

    Activation can take about a minute while Decodable allocates resources. Your connection should display Starting, and eventually transition to Running. If there’s data on the topic, you should see output record counts and bytes metrics begin to increase. Note that metrics are reset upon each activation. Stopping and restarting a connection resets metrics back to zero.

    If you don’t see metrics increasing:

    • Check that the connection started and isn’t restarting due to an error.

    • Check that new data is actively flowing if you started your connection from the Latest position.

    • Check that the topic isn’t empty if you started your connection from the Earliest position.

    • Check that you’ve selected the correct Value Format for your data, if you’ve selected the Ignore option for the Parse Error Policy.

      You can follow the lineage trail by selecting Outbound to 1 stream - your stream name to preview data in the source connection’s output stream. This is the stream you selected or created in step 1.2. Stream preview is updated whenever you navigate to a stream, but it can be manually refreshed by selecting Run Preview.

      If you don’t see data in your output stream:

    • Check that your connection is working (see above).

    • Try selecting Run Preview.

Step 2: Process your data (Optional)

Now that you’re data is flowing into a stream, you can optionally add processing by creating a pipeline in either SQL or by writing Java. For this guide, we’ll filter our data using a standard where clause in SQL. The exact fields you can reference are governed by the schema in the stream you created earlier.

  1. Create a pipeline

    Select Pipelines, and then New Pipeline. If you are participating in the public tech preview for custom pipelines, which allows you write pipelines in Java and other JVM-based languages using the full power of the Apache Flink APIs, you can select between SQL and a custom pipeline. For now, select SQL.

    Choose your Input Stream. This should be the stream you created or selected in step 1.2. You can always change your mind later while editing the pipeline SQL.

  2. Write your SQL

    In the SQL editor you can browse all of your existing streams in the Data Catalog panel, edit SQL, and preview the results of your SQL query.

    SQL pipelines are always insert into …​ select …​ from …​ queries. The insert into clause specifies the pipeline’s output stream, while the select …​ from …​ specifies the input streams and any transformations. Decodable automatically generates an output stream name for you (for example stream_<digits>), but you’re encouraged to change it to something meaningful to you. Decodable supports standard SQL including joins, windowed and global aggregation, Common Table Expressions (CTE), and even pattern detection with match_recognize.

    For example, let’s assume that our data contained order events. We can filter our data by configuring the following pipeline.

    insert into failed_orders -- Our output stream will be failed_orders.
    select *                  -- Select all fields, no transformation.
    from orders               -- Our input stream.
    where status = 'failed'   -- Filter events for failed orders.

    If necessary, we can keep only the necessary fields and mask sensitive data.

    insert into masked_failed_orders
    select
       order_id,
       total,
       line_items,
       -- address,   Omit the address field.
       postal_code,
       country,
    
       -- Mask card digits except the first 1 and the last 4.
       overlay(cc_number placing '***********' from 2 for 11) as card_masked
    from orders
    where status = 'failed'

    See the Function Reference to learn more about supported SQL functions. If you’re interested in windowed aggregations, those are supported too. There are also a number of SQL pipeline examples to help get you started with common patterns.

    Next, let’s preview our pipeline to make sure the SQL is processing the data as expected. Previews display up to 30 records from the data stream and can be used to see how your pipeline changes the incoming data. Previews run until 30 records have been seen or until 2 minutes have passed, whichever comes first.

    Start the preview by selecting Run Preview. You can optionally run the preview from the earliest position in the input streams or the latest position. By default, preview runs from the latest position.

    Not seeing preview output? Here are a few things to check:

    • Make sure your input stream contains data.

    • If the input stream isn’t currently receiving data, select the Earliest position to run a preview over data that’s already in the stream.

    Once your data looks correct, select Next to continue.

  3. Create an output stream or validate an existing output stream’s schema

    If you’ve specified an output stream that doesn’t exist, Decodable prompts you to create a new stream. The stream’s schema is automatically inferred from the SQL query, including field names and their types. The stream’s name is defined by the SQL query, so it isn’t editable while displayed. As always, you should give your stream a meaningful description.

    Once everything looks correct, select Create Stream.

    Alternatively, if you specified a stream that already exists as an output stream in your SQL query, the stream’s schema must match the output of the select …​ from …​ portion of the query. You’ll get an error if this isn’t the case.

  4. Name your pipeline

    Finally, just like our connection, give the new pipeline a name and description.

    Select Create Pipeline to complete the process.

  5. Start your pipeline

    Like connections, pipelines must be started in order to begin processing data.

    Select Start to activate your pipeline. Select the appropriate maximum task size and maximum task count just as we did for our connection. Like preview, select whether the pipeline should start from the earliest or latest position in the input stream.

    Pipeline metrics work identically to connection metrics with a few noteworthy exceptions:

    • Both input and output metrics exist for pipelines. This can help you understand whether or not a pipeline is filtering all records by accident, if it’s keeping up with the rate of input data, and other common operational concerns.

    • An additional metric displaying lag. This is the number of records that are ready for processing, but haven’t yet been processed. In most cases, this metric should be zero which means the pipeline is keeping up with the rate of new data on its input stream. Since input streams are partitioned, this metric reflects the maximum lag across all partitions. Note that it’s not the sum of lag over all partitions.

    Select Output to 1 stream in the lineage bar to navigate to the pipeline’s output stream and confirm that your pipeline is processing and sending data.

Step 3: Create a sink connection

The final step is to create a connection to the data warehouse, data lake, or OLAP database that you’d like to send data to. Connections made to sinks are also called sink connections. Here, we’ll use Snowflake, but the process is the same for other systems.

We can create our sink connection the same way we did with our source - by selecting Connections and Create Connection. But, we can also use the lineage bar while viewing a stream.

Select the ellipsis (…​) on the outbound right-hand side of the lineage bar and select Create a Sink Connection. This creates a connection that uses the selected stream as its input.

Find the Snowflake connector and select Connect.

  1. Configure required connection information

    Complete the required fields for the connection to Snowflake. This includes the Snowflake database, schema, table, authentication, and credentials. Since we’re working with an append stream, you can ignore the change stream merge interval and the warehouse.

    The Snowflake connector uses Snowflake Snowpipe Streaming which doesn’t require a warehouse to be running to ingest data. This has three significant advantages:

    • Snowpipe Streaming uses significantly fewer Snowflake credits than running a warehouse. This makes data ingestion less expensive than other methods, in some cases significantly so.

    • Data is ingested directly into Snowflake’s internal storage format, requiring no staging.

    • Data is ready for query almost instantly and continuously as it’s processed.

    Select Next to validate the connection parameters.

  2. Start the connection

    Select Start to activate the sink connection. Choose the appropriate maximum task size and maximum number of tasks for your data volume.

Step 4: Celebrate! 🥳

You’re now ingesting data into your data warehouse in real-time.

Here are a few things to do next: