Postgres source connector

Features

Connector name

postgres-cdc

Delivery guarantee

Exactly once

Supported task sizes

S, M, L

Multiplex capability

A single instance of this connector can read from multiple tables and schemas from a single database.

Supported stream types

This connector will write a Change stream so long as the source tables have a primary key (PK) defined. If a PK isn’t defined then an Append stream will be written instead.

Configuration properties

If you are using the CLI to create or edit a connection with this connector, you must use the declarative approach. You can generate the connection definition for the tables that you want to ingest using decodable connection scan.

Use the properties below to configure the connector. Refer to the documentation for instructions on creating and editing connections.

Property Description Required Default

Basic

host

The host of the Postgres server

Yes

port

The port of the Postgres server

5432

username

The username to use to authenticate to Postgres

Yes

password

The password associated with the username. This must be provided as a secret resource.

Yes

database-name

The name of the database.

Yes

Advanced

decoding.plugin.name

The plugin to use for reading data from the database.

pgoutput

Prerequisites

  • Your Postgres database must be accessible from the Decodable network. Connectivity options include AWS PrivateLink, SSH tunnels, and allowing connections from the Decodable published IP addresses..

  • Your Postgres database must have the logical decoding feature enabled, which provides the ability to stream data changes to external consumers. The way you enable this feature depends on the Cloud environment that you are running Postgres in. See Setting up Postgres from the Debezium documentation for more information.

  • Your Postgres database user must have sufficient permissions to create Postgres publications. See Setting privileges to enable Debezium to create Postgres publications when you use pgoutput from the Debezium documentation for more information.

  • Your Postgres database tables must have replica identity set to FULL.

    This means the entire row data will be used to identify the row for updates/deletes. To check the replica identity of your table, you may run the following query in Postgres:

    SELECT relreplident
      FROM pg_class
     WHERE oid = '<table-name>'::regclass;

    So if your table is called orders you would run:

    SELECT relreplident
      FROM pg_class
     WHERE oid = 'orders'::regclass;

    The above query will return the value f (for "full"), if the replica identity is set.

    If the identity isn’t yet set to FULL, run the following command:

    ALTER TABLE <table-name> REPLICA IDENTITY FULL
🎥 For more details, see our YouTube video How to Enable Change Data Capture with Postgres on Amazon RDS

Ingesting data from multiple tables

A single instance of this connector can read from multiple tables and schemas from a single database.

If you are using the CLI to create or edit a connection with this connector, you must use the declarative approach. You can generate the connection definition for the tables that you want to ingest using decodable connection scan.

Resource specifier keys

The following resource specifier keys are available:

Name Description

database-name

The database name

schema-name

The schema name

table-name

The table name

Transaction Log retention

If the connection is stopped or in a failed state for longer than the transaction log’s retention period, the connection will fail when it’s restarted. This is because for CDC to work it needs a contiguous series of transaction log entries.

If you want to restart the connection in this situation you must discard its current state. By doing this, the initial snapshot of the required tables will be taken again and then the transaction log used for subsequent reads.

To do this do, one of the following:

  1. In the Decodable Web UI, select Start and under Starting State select Reset current state and start from the initial state

  2. In the Decodable CLI, do one of the following:

    1. Use connection activate and add the --force flag, for example:

      decodable connection activate cef0e708 --force

      or

    2. Use query with a suitable specifier for the connection (such as --name) and add the --operation reset-state argument, for example:

      decodable query --name customers-source --operation reset-state

Postgres replication slots

This connector will create a replication slot in the Postgres database upon creation, named decodable_<connection-id>. This slot tracks how far the Write Ahead Log (WAL) has been consumed by the connection at a given point.

Starting from Postgres 16, this connector can also connect to a replication slot on a read replica.

Upon the first start-up, the Decodable Postgres CDC connector creates a publication for ALL TABLES. Alternatively, you can create this publication yourself prior to starting the connection. You might do this if you want to publish just a specific set of tables. To do so, create a publication with the name dbz_publication.

The connector currently doesn’t delete its replication slot when it’s deleted in Decodable itself. This causes unbounded WAL growth in the source Postgres database, as that replication slot will never be consumed again. Therefore if you delete this connection you should also manually delete the associated replication slot. To do so, run the following command in a Postgres session:

SELECT pg_drop_replication_slot(<slot_name>)
  FROM pg_replication_slots;

Data types mapping

The following table shows the Decodable data types that are generated from the corresponding Postgres data types.

Postgres Type Decodable Type

INT2

SMALLINT

SMALLSERIAL

SERIAL2

SMALLINT

INT

INT4

INTEGER

SERIAL

INT

INT8

BIGINT

BIGSERIAL

BIGINT

FLOAT4

REAL

FLOAT

FLOAT8

DOUBLE

DECIMAL(p, s)

DEC(p, s)

NUMERIC(p, s)

DECIMAL(p, s)

BOOL

BOOLEAN

BOOLEAN

DATE

DATE

TIME(p)

TIMETZ(p)

TIME(p)

TIME(6) if no precision provided

TIMESTAMP(p)

TIMESTAMP(p)

TIMESTAMP(9) if no precision provided

TIMESTAMPTZ(p)

TIMESTAMP_LTZ(p)

TIMESTAMP_LTZ(9) if no precision provided

BPCHAR

CHAR(2147483647)

BPCHAR(n)

CHAR(n)

CHARACTER(n)

NCHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

JSON

JSONB

TEXT

XML

STRING

UUID

CHAR(36)

BIT

BYTEA

VARBIT

BYTES

ACLITEM

BOX

CID

CIDR

CIRCLE

GEOMETRY

GTSVECTOR

INET

INT2VECTOR

INTERVAL

JSONPATH

LINE

LSEG

MACADDR

MACADDR8

MONEY

NAME

OID

OIDVECTOR

PATH

POINT

POLYGON

REFCURSOR

TID

TSQUERY

TSVECTOR

TXID_SNAPSHOT

XID

XID8

Not supported