Postgres source connector

Features

Connector name

postgres-cdc

Delivery guarantee

Exactly once

Supported task sizes

S, M, L

Multiplex capability

A single instance of this connector can read from multiple tables and schemas from a single database.

Supported stream types

This connector will write a Change stream so long as the source tables have a primary key (PK) defined. If a PK isn’t defined then an Append stream will be written instead.

Configuration properties

If you are using the CLI to create or edit a connection with this connector, you must use the declarative approach. You can generate the connection definition for the resources that you want to ingest using decodable connection scan.

Use the properties below to configure the connector. Refer to the documentation for instructions on creating and editing connections.

Property Description Required Default

Basic

host

The host of the Postgres server

Yes

port

The port of the Postgres server

5432

username

The username to use to authenticate to Postgres

Yes

password

The password associated with the username. This must be provided as a secret resource.

Yes

database-name

The name of the database.

Yes

Advanced

decoding.plugin.name

The plugin to use for reading data from the database.

pgoutput

Prerequisites

🎥 For more details, see our YouTube video How to Enable Change Data Capture with Postgres on Amazon RDS

Ingesting data from multiple tables

A single instance of this connector can read from multiple tables and schemas from a single database.

If you are using the CLI to create or edit a connection with this connector, you must use the declarative approach. You can generate the connection definition for the resources that you want to ingest using decodable connection scan.

Resource specifier keys

The following resource specifier keys are available:

Name Description

database-name

The database name

schema-name

The schema name

table-name

The table name

Transaction Log retention

If the connection is stopped or in a failed state for longer than the transaction log’s retention period, the connection will fail when it’s restarted. This is because for CDC to work it needs a contiguous series of transaction log entries.

If you want to restart the connection in this situation you must discard its current state. By doing this, the initial snapshot of the required tables will be taken again and then the transaction log used for subsequent reads.

To do this do, one of the following:

  1. In the Decodable Web UI, select Start and under Starting State select Reset current state and start from the initial state

  2. In the Decodable CLI, do one of the following:

    1. Use connection activate and add the --force flag, for example:

      decodable connection activate cef0e708 --force

      or

    2. Use query with a suitable specifier for the connection (such as --name) and add the --operation reset-state argument, for example:

      decodable query --name customers-source --operation reset-state

Postgres replication slots

This connector will create a replication slot in the Postgres database upon creation, named decodable_<connection-id>. This slot tracks how far the Write Ahead Log (WAL) has been consumed by the connection at a given point.

Starting from Postgres 16, this connector can also connect to a replication slot on a read replica.

Upon the first start-up, the Decodable Postgres CDC connector creates a publication for ALL TABLES. Alternatively, you can create this publication yourself prior to starting the connection. You might do this if you want to publish just a specific set of tables. To do so, create a publication with the name dbz_publication.

The connector currently doesn’t delete its replication slot when it’s deleted in Decodable itself. This causes unbounded WAL growth in the source Postgres database, as that replication slot will never be consumed again. Therefore if you delete this connection you should also manually delete the associated replication slot. To do so, run the following command in a Postgres session:

SELECT pg_drop_replication_slot(<slot_name>)
  FROM pg_replication_slots;

Replica identity

The replica identity configuration of a table determines the level of detail stored by Postgres for each record, for the sake of data replication.

A replica identity of FULL means that the entire previous value of a record is stored in Postgres, and emitted for record updates/deletes.

Replica identity DEFAULT means that only the primary key (PK) of the record is emitted for updates/deletes. Under this configuration, it becomes the responsibility of the connector to store the entire value of the previous record for each PK.

The decision between replica identity FULL and DEFAULT is essentially a tradeoff between increased WAL size in your Postgres database (FULL), vs. increased state size, possible need for increased task size, and potentially slower processing in your Decodable source connection (DEFAULT).

Decodable will attempt to infer the replica identity of your tables at connection startup time, and behave accordingly.

To check the replica identity of your table, run the following query in Postgres:

SELECT relreplident
  FROM pg_class
 WHERE oid = '<table-name>'::regclass;

So if your table is called orders you would run:

SELECT relreplident
  FROM pg_class
 WHERE oid = 'orders'::regclass;

If the replica identity is set the query will return the value f (full) or d (default).

If the identity isn’t yet set, run the following command, with the appropriate replica identity level for your needs:

ALTER TABLE <table-name> REPLICA IDENTITY FULL;

Data types mapping

The following table shows the Decodable data types that are generated from the corresponding Postgres data types.

Postgres Type Decodable Type

INT2

SMALLINT

SMALLSERIAL

SERIAL2

SMALLINT

INT

INT4

INTEGER

SERIAL

INT

INT8

BIGINT

BIGSERIAL

BIGINT

FLOAT4

REAL

FLOAT

FLOAT8

DOUBLE

DECIMAL(p, s)

DEC(p, s)

NUMERIC(p, s)

DECIMAL(p, s)

BOOL

BOOLEAN

BOOLEAN

DATE

DATE

TIME(p)

TIMETZ(p)

TIME(p)

TIME(6) if no precision provided

TIMESTAMP(p)

TIMESTAMP(p)

TIMESTAMP(9) if no precision provided

TIMESTAMPTZ(p)

TIMESTAMP_LTZ(p)

TIMESTAMP_LTZ(9) if no precision provided

BPCHAR

CHAR(2147483647)

BPCHAR(n)

CHAR(n)

CHARACTER(n)

NCHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

JSON

JSONB

TEXT

XML

ENUM [1]

CITEXT

STRING

UUID

CHAR(36)

BIT

BYTEA

VARBIT

BYTES

ACLITEM

BOX

CID

CIDR

CIRCLE

GEOMETRY

GTSVECTOR

INET

INT2VECTOR

INTERVAL

JSONPATH

LINE

LSEG

MACADDR

MACADDR8

MONEY

NAME

OID

OIDVECTOR

PATH

POINT

POLYGON

REFCURSOR

TID

TSQUERY

TSVECTOR

TXID_SNAPSHOT

XID

XID8

Not supported


1. ENUM is the stand-in name used in place of any custom user-defined enumerated types.