Replicate data from a database to your data warehouse or data lake using Debezium-powered CDC

This guide walks you through replicating data from a database into your data warehouse or data lake. Decodable uses Debezium-powered Change Data Capture (CDC) to enable continuous, real-time replication from popular databases such as PostgreSQL, MySQL, and MongoDB to popular data warehouses or data lakes like Snowflake. By replicating your data in real-time, you are ensuring that you have a fresh and accurate copy of your data in case of an outage or failure.

This guide uses Postgres as the data source and Snowflake as the data destination, but you can substitute these for your platforms of choice. For example, you can use a different event streaming platform such as Redpanda, Apache Pulsar, AWS Kinesis, Confluent Cloud, or others. Similarly, other common destinations are Amazon S3, DeltaLake, ClickHouse, Apache Pinot, and others. For a full list of supported connectors, see the following topics:

Let’s go! 🚀

Prerequisites

To create a connection between Postgres and Decodable, do the following.

  • Your Postgres database must be publicly accessible. Decodable uses the username and password provided during connection creation to authenticate to the database.

  • Your Postgres database must have the logical decoding feature enabled, which provides the ability to stream data changes to external consumers. The way you enable this feature depends on the Cloud environment that you are running Postgres in. See Setting up Postgres from the Debezium documentation for more information.

  • Your Postgres database user must have sufficient permissions to create PostgreSQL publications. See Setting privileges to enable Debezium to create PostgreSQL publications when you use pgoutput from the Debezium documentation for more information.

  • Your Postgres database tables must have replica identity set to FULL.

    This means the entire row data will be used to identify the row for updates/deletes. To check the replica identity of your table, you may run the following query in Postgres:

    SELECT relreplident
      FROM pg_class
     WHERE oid = '<table-name>'::regclass;

    So if your table is called orders you would run:

    SELECT relreplident
      FROM pg_class
     WHERE oid = 'orders'::regclass;

    The above query will return the value f (for "full"), if the replica identity is set.

    If the identity isn’t yet set to FULL, run the following command:

    ALTER TABLE <table-name> REPLICA IDENTITY FULL
  • Have your Postgres database information handy. This includes the host name of your database, the name of the database, the name of the database table, and the username and password used to login to the database. You will need to provide this information as part of connection creation.

For video guidance on completing these prerequisites, see the Decodable YouTube video How to Enable Change Data Capture with Postgres on Amazon RDS to learn how to prepare a Postgres database on Amazon RDS that can connect and send CDC records to Decodable.

To create a connection between Decodable and Snowflake, you must fulfill the prerequisites as described in the Snowflake Connector documentation including the additional prerequisites listed in the "Additional Change Stream Processing Prerequisites" section. See the Prerequisites: Prepare Snowflake for connections for more information.

Lastly, make sure that you have allowed connections from Decodable’s IPs. Otherwise, Decodable will be unable to connect to your database.

Guide

Step 1: Create and start a source connection

Go to Connections and select New Connection. Connections allow data to flow from data sources or destinations into a Decodable stream.

  1. Configure the connection

    Select PostgreSQL CDC (or your system of choice). Then, choose Source as the connection type and provide the required connection information such as the host name of your database, database name, database table name, and the username and password of the database.

    By default, Postgres connections operate in exactly once delivery mode in Decodable. For more information about the other PostgreSQL options, see the Postgres connector documentation.

    Select Next.

  2. Choose or create a stream

    Streams transport records within Decodable. You can select a preexisting stream to which data should be written, or you can create a new stream.

    If you’re creating a new stream, give it a name. Names must be unique within an account and can contain letters, numbers, and underscores ([a-z0-9\_]+). This is how you’ll refer to your data in SQL queries and custom pipelines written in Java. Best practices are to give streams a description that helps you and your team remember what data it contains and how it should be used.

    Next, you must define the schema of the stream. The stream’s schema must match the schema of the data in the Postgres database. This is how Decodable validates and enables processing of data. The schema can be as simple as having a single text field or can fully describe a complex nested record. All records in a stream must have the same schema. If you already have a schema defined in JSON Schema or Apache Avro format, you can upload these to Decodable.

    After you finish entering all of the fields present in the record, you must specify one or more fields to use as the primary key. A primary key field contains a value that can be used to uniquely identify each row in a table. Decodable uses the primary key to interpret which action (insert, update, or delete) to perform in the downstream table. To designate a field to use as a primary key, you must explicitly specify that the field’s type isn’t null by entering <type> NOT NULL for the field type. For example, BIGINT NOT NULL. Then, you can select the primary key field in the Primary Key Fields box.

    To learn more about streams, see About Streams for an overview on the types of streams available, About Change Data Capture and Change Streams for information about change streams, and Create a Stream for information about creating streams, including how to specify event time and watermark information as well as how to use computed or metadata fields.

  3. Name your connection

    Give your connection a meaningful name and description for future reference.

    Selecting Create Connection performs a connectivity test. If Decodable successfully connects to your PostgreSQL table, a connection is created and a Connection Details page opens.

    If Decodable is unable to validate the connection, an error is shown with more information. The most common type of error is a network connectivity error because Decodable is unable to connect to your database because a firewall or security group prevents access. This often results in a timeout while trying to connect. Make sure you’ve allowed connections from Decodable’s IPs.

  4. Start your new connection

    Once your connection is created, you can start it by selecting Start. You choose the maximum task size and the maximum number of tasks when activating a connector or pipeline. A task is a compute unit that includes CPU, memory, network bandwidth, and local storage. Most connectors and all pipelines support scale-out parallel execution. The number of tasks controls the maximum parallelism of a connection or pipeline. A medium task is sufficient for most connections and should provide between 1000 and 30,000 records per second depending on record size, serialization format, and similar factors.

    Activation can take about a minute while Decodable allocates resources. Your connection should display Starting, and eventually transition to Running. If there’s data on the topic, you should see output record counts and bytes metrics begin to increase. Note that metrics are reset upon each activation. Stopping and restarting a connection resets metrics back to zero.

    If you don’t see metrics increasing, make sure that the connection has the status Running.

    You can follow the lineage trail by selecting Outbound to 1 stream - your stream name to preview data in the source connection’s output stream. This is the stream you selected or created in step 1.2. Stream preview is updated whenever you navigate to a stream, but it can be manually refreshed by selecting Run Preview.

    There are two types of streams in Decodable: append streams and change streams. Append streams are a type of data stream where new records are continuously added to the end of the stream. In contrast, change streams are a type of data stream that captures changes to a database in real-time. Records in a change stream represent a change made to a database, such as updates, inserts, and deletes. The stream that you are previewing now is a change stream.

    When you are previewing a change stream, you can view both the physical and logical schemas of the stream. The logical schema is the schema managed by you and is the schema that you defined earlier. The physical schema is the schema that represents how the data is stored and processed. For Debezium-formatted CDC records, the physical schema includes the type of operation performed and the values of the affected fields before and after the change. Select the Table dropdown to toggle between viewing the logical schema and the physical schema of the incoming data.

    If you don’t see data in your output stream:

    • Check that your connection has the status Running.

    • Try selecting Run Preview.

Step 2: Process your data (Optional)

Now that your data is flowing into a stream, you can optionally add processing by creating a pipeline in either SQL or by writing Java. For this guide, we’ll filter our data using a standard where clause in SQL. The exact fields you can reference are governed by the schema in the stream you created earlier.

  1. Create a pipeline

    Select Pipelines, and then New Pipeline. If you are participating in the public tech preview for custom pipelines, which allows you write pipelines in Java and other JVM-based languages using the full power of the Apache Flink APIs, you can select between SQL and a custom pipeline. For now, select SQL.

    Choose your Input Stream. This should be the stream you created or selected in step 1.2. You can always change your mind later while editing the pipeline SQL.

  2. Write your SQL

    In the SQL editor you can browse all of your existing streams in the Data Catalog panel, edit SQL, and preview the results of your SQL query.

    SQL pipelines are always insert into …​ select …​ from …​ queries. The insert into clause specifies the pipeline’s output stream, while the select …​ from …​ specifies the input streams and any transformations. Decodable automatically generates an output stream name for you (for example stream_<digits>), but you’re encouraged to change it to something meaningful to you. Decodable supports standard SQL including joins, windowed and global aggregation, Common Table Expressions (CTE), and even pattern detection with match_recognize.

    For example, let’s assume that our data contained order events. We can filter our data by configuring the following pipeline.

    insert into failed_orders -- Our output stream will be failed_orders.
    select *                  -- Select all fields, no transformation.
    from orders               -- Our input stream.
    where status = 'failed'   -- Filter events for failed orders.

    If necessary, we can keep only the necessary fields and mask sensitive data.

    insert into masked_failed_orders
    select
       order_id,
       total,
       line_items,
       -- address,   Omit the address field.
       postal_code,
       country,
    
       -- Mask card digits except the first 1 and the last 4.
       overlay(cc_number placing '***********' from 2 for 11) as card_masked
    from orders
    where status = 'failed'

    See the Function Reference to learn more about supported SQL functions. If you’re interested in windowed aggregations, those are supported too. There are also a number of SQL pipeline examples to help get you started with common patterns.

    Next, let’s preview our pipeline to make sure the SQL is processing the data as expected. Previews display up to 30 records from the data stream and can be used to see how your pipeline changes the incoming data. Previews run until 30 records have been seen or until 2 minutes have passed, whichever comes first.

    Start the preview by selecting Run Preview. You can optionally run the preview from the earliest position in the input streams or the latest position. By default, preview runs from the latest position.

    Not seeing preview output? Here are a few things to check:

    • Make sure your input stream contains data.

    • If the input stream isn’t currently receiving data, select the Earliest position to run a preview over data that’s already in the stream.

    Once your data looks correct, select Next to continue.

  3. Create an output stream or validate an existing output stream’s schema

    If you’ve specified an output stream that doesn’t exist, Decodable prompts you to create a new stream. The stream’s schema is automatically inferred from the SQL query, including field names and their types. The stream’s name is defined by the SQL query, so it isn’t editable while displayed. As always, you should give your stream a meaningful description.

    Once everything looks correct, select Create Stream.

    Alternatively, if you specified a stream that already exists as an output stream in your SQL query, the stream’s schema must match the output of the select …​ from …​ portion of the query. You’ll get an error if this isn’t the case.

  4. Name your pipeline

    Finally, just like our connection, give the new pipeline a name and description.

    Select Create Pipeline to complete the process.

  5. Start your pipeline

    Like connections, pipelines must be started in order to begin processing data.

    Select Start to activate your pipeline. Select the appropriate maximum task size and maximum task count just as we did for our connection. Like preview, select whether the pipeline should start from the earliest or latest position in the input stream.

    Pipeline metrics work identically to connection metrics with a few noteworthy exceptions:

    • Both input and output metrics exist for pipelines. This can help you understand whether or not a pipeline is filtering all records by accident, if it’s keeping up with the rate of input data, and other common operational concerns.

    • An additional metric displaying lag. This is the number of records that are ready for processing, but haven’t yet been processed. In most cases, this metric should be zero which means the pipeline is keeping up with the rate of new data on its input stream. Since input streams are partitioned, this metric reflects the maximum lag across all partitions. Note that it’s not the sum of lag over all partitions.

    Select Output to 1 stream in the lineage bar to navigate to the pipeline’s output stream and confirm that your pipeline is processing and sending data.

Step 3: Create a sink connection

The final step is to create a connection to the data warehouse, data lake, or OLAP database that you’d like to send data to. Connections made to destinations are also called sink connections. Here, we’ll use Snowflake, but the process is the same for other systems.

At this point, make sure that you’ve completed the prerequisites for using the Snowflake Connector as described in the Prerequisites: Prepare Snowflake for connections section.

We can create our sink connection the same way we did with our source: by selecting Connections and Create Connection. But, we can also use the lineage bar while viewing a stream.

Select the ellipsis (…​) on the outbound right-hand side of the lineage bar and select Create a Sink Connection. This creates a connection that uses the selected stream as its input.

Find the Snowflake connector and select Connect.

  1. Configure required connection information

    Complete the required fields for the connection to Snowflake. This includes the Snowflake database, schema, table, authentication, and credentials. Since we’re sending change data capture records, you will need to provide the warehouse that you created as part of the prerequisites as well as the merge interval. Decodable sends change records to the Snowflake destination table in batches. The Merge Interval parameter defines the interval to wait between each batch.

    For more information about the Snowflake Connector, including how Decodable sends change data capture records to Snowflake, see the Snowflake connector documentation.

    Select Next to validate the connection parameters.

  2. Start the connection

    Select Start to activate the sink connection. Choose the appropriate maximum task size and maximum number of tasks for your data volume.

Step 4: Celebrate! 🥳 🥳

You’re now ingesting data into your data warehouse in real-time.

Here are a few things to do next: