Postgres source connector

Use the PostgreSQL CDC (Change Data Capture) Connector to get data from PostgreSQL into Decodable. The PostgreSQL CDC Connector is powered by Debezium and able to extract and send change events (INSERTS, UPDATES, and DELETES) through Decodable.

If you are looking for instructions on how to send data from Decodable into Postgres, see PostgreSQL in the Connect to a data destination chapter.

Features

Delivery guarantee

Exactly once

Write to multiple streams

Yes

Prerequisites

Before you can get data from PostgreSQL, the following requirements must be met:

  • Your Postgres database must be accessible from the Decodable network. Contact Decodable support for more information about enabling network access to your infrastructure.

  • Your Postgres database must have the logical decoding feature enabled, which provides the ability to stream data changes to external consumers. The way you enable this feature depends on the Cloud environment that you are running Postgres in. See Setting up Postgres from the Debezium documentation for more information.

  • Your Postgres database user must have sufficient permissions to create PostgreSQL publications. See Setting privileges to enable Debezium to create PostgreSQL publications when you use pgoutput from the Debezium documentation for more information.

  • Your Postgres database tables must have replica identity set to FULL.

    This means the entire row data will be used to identify the row for updates/deletes. To check the replica identity of your table, you may run the following query in Postgres:

    SELECT relreplident
      FROM pg_class
     WHERE oid = '<table-name>'::regclass;

    So if your table is called orders you would run:

    SELECT relreplident
      FROM pg_class
     WHERE oid = 'orders'::regclass;

    The above query will return the value f (for "full"), if the replica identity is set.

    If the identity isn’t yet set to FULL, run the following command:

    ALTER TABLE <table-name> REPLICA IDENTITY FULL
  • Have your Postgres database information handy. This includes the host name of your database, the name of the database, the name of the database table, and the username and password used to login to the database. You will need to provide this information as part of connection creation.

For video guidance on completing these prerequisites, see the Decodable YouTube video How to Enable Change Data Capture with Postgres on Amazon RDS to learn how to prepare a Postgres database on Amazon RDS that can connect and send CDC records to Decodable.

Steps

If you want to use the Decodable CLI or API to create the connection, you can refer to the Property Name column for information about what the underlying property names are. The connector name is postgres-cdc.
  1. From the Connections page, select PostgreSQL CDC and complete the following fields.

    UI Field Property Name Description

    Connection Type

    N/A

    Select Source to use this connector to get data into Decodable.

    Host

    host

    The host where the PostgreSQL database can be reached. For example, postgres-server.

    Port

    port

    Optional. The port where the PostgreSQL database can be reached.

    Defaults to 5432.

    Database

    database-name

    The name of the database.

    Schema

    schema-name

    The name of the schema containing the table that you want to receive data from.

    Defaults to public.

    Table Name

    table-name

    The name of the database table for the connection.

    Username

    username

    The username to use to authenticate to PostgreSQL.

    Password

    password

    The password associated with the username. This must be provided as a secret resource. If you are using the Decodable CLI, run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret.

    Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Roles, groups, and permissions for more information.

    Decoding Plugin Name

    decoding.plugin.name

    Optional. The plugin to use for reading data from the database.

    Defaults to pgoutput.

  2. Select the stream that you’d like to connect to this connector. Then, select Next.

  3. Define the connection’s schema. Select New Schema to manually enter the fields and field types present or Import Schema if you want to paste the schema in Avro or JSON format.

    1. The stream’s schema must match the schema of the data that you plan on sending through this connection.

    2. You must designate one or more fields to use as a primary key. A primary key is a field that contains a value that can be used to uniquely identify each row in a table. To specify a primary key, you must first explicitly tell Decodable that the type isn’t null explicitly by entering: <type> NOT NULL. For example: BIGINT NOT NULL.

    3. For more information about Change Data Capture, change streams, or creating a stream, see the following pages:

  4. Select Next when you are finished providing defining the connection’s schema.

  5. Give the newly created connection a Name and Description and select Save.

Postgres replication slots

This connector will create a replication slot in the Postgres database upon creation, named decodable_<connection-id>. This slot tracks how far the Write Ahead Log (WAL) has been consumed by the connection at a given point. However the connector currently doesn’t delete that slot when it’s deleted in Decodable itself.

This causes unbounded WAL growth in the source Postgres database, as that replication slot will never be consumed again. Therefore if you delete this connection you should also manually delete the associated replication slot. To do so, run the following command in a Postgres session:

SELECT pg_drop_replication_slot(<slot_name>)
  FROM pg_replication_slots;

Data types mapping

The following table shows the Decodable data types that are generated from the corresponding Postgres data types.

Postgres Type Decodable Type

INT2

SMALLINT

SMALLSERIAL

SERIAL2

SMALLINT

INT

INT4

INTEGER

SERIAL

INT

INT8

BIGINT

BIGSERIAL

BIGINT

FLOAT4

REAL

FLOAT

FLOAT8

DOUBLE

DECIMAL(p, s)

DEC(p, s)

NUMERIC(p, s)

DECIMAL(p, s)

BOOL

BOOLEAN

BOOLEAN `

DATE

DATE

TIME(p)

TIMETZ(p)

TIME(p)

TIME(6) if no precision provided

TIMESTAMP(p)

TIMESTAMP(p)

TIMESTAMP(9) if no precision provided

TIMESTAMPTZ(p)

TIMESTAMP_LTZ(p)

TIMESTAMP_LTZ(9) if no precision provided

BPCHAR

CHAR(2147483647)

BPCHAR(n)

CHAR(n)

CHARACTER(n)

NCHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

JSON

JSONB

TEXT

XML

STRING

UUID

CHAR(36)

BIT

BYTEA

VARBIT

BYTES

ACLITEM

BOX

CID

CIDR

CIRCLE

GTSVECTOR

INET

INT2VECTOR

INTERVAL

JSONPATH

LINE

LSEG

MACADDR

MACADDR8

MONEY

NAME

OID

OIDVECTOR

PATH

POINT

POLYGON

REFCURSOR

TID

TSQUERY

TSVECTOR

TXID_SNAPSHOT

XID

XID8

Not supported