Postgres source connector Features Connector name postgres-cdc Delivery guarantee Exactly once Supported task sizes S, M, L Multiplex capability A single instance of this connector can read from multiple tables and schemas from a single database. Supported stream types This connector will write a Change stream so long as the source tables have a primary key (PK) defined. If a PK isn’t defined then an Append stream will be written instead. Configuration properties If you are using the CLI to create or edit a connection with this connector, you must use the declarative approach. You can generate the connection definition for the tables that you want to ingest using decodable connection scan. Use the properties below to configure the connector. Refer to the documentation for instructions on creating and editing connections. Property Description Required Default Basic host The host of the Postgres server Yes port The port of the Postgres server — 5432 username The username to use to authenticate to Postgres Yes password The password associated with the username. This must be provided as a secret resource. Yes database-name The name of the database. Yes Advanced decoding.plugin.name The plugin to use for reading data from the database. — pgoutput Prerequisites Your Postgres database must be accessible from the Decodable network. Connectivity options include AWS PrivateLink, SSH tunnels, and allowing connections from the Decodable published IP addresses.. Your Postgres database must have the logical decoding feature enabled, which provides the ability to stream data changes to external consumers. The way you enable this feature depends on the Cloud environment that you are running Postgres in. See Setting up Postgres from the Debezium documentation for more information. Your Postgres database user must have sufficient permissions to create Postgres publications. See Setting privileges to enable Debezium to create Postgres publications when you use pgoutput from the Debezium documentation for more information. Your Postgres database tables must have replica identity set to FULL. This means the entire row data will be used to identify the row for updates/deletes. To check the replica identity of your table, you may run the following query in Postgres: SELECT relreplident FROM pg_class WHERE oid = '<table-name>'::regclass; So if your table is called orders you would run: SELECT relreplident FROM pg_class WHERE oid = 'orders'::regclass; The above query will return the value f (for "full"), if the replica identity is set. If the identity isn’t yet set to FULL, run the following command: ALTER TABLE <table-name> REPLICA IDENTITY FULL 🎥 For more details, see our YouTube video How to Enable Change Data Capture with Postgres on Amazon RDS Ingesting data from multiple tables A single instance of this connector can read from multiple tables and schemas from a single database. If you are using the CLI to create or edit a connection with this connector, you must use the declarative approach. You can generate the connection definition for the tables that you want to ingest using decodable connection scan. Resource specifier keys The following resource specifier keys are available: Name Description database-name The database name schema-name The schema name table-name The table name Transaction Log retention If the connection is stopped or in a failed state for longer than the transaction log’s retention period, the connection will fail when it’s restarted. This is because for CDC to work it needs a contiguous series of transaction log entries. If you want to restart the connection in this situation you must discard its current state. By doing this, the initial snapshot of the required tables will be taken again and then the transaction log used for subsequent reads. To do this do, one of the following: In the Decodable Web UI, select Start and under Starting State select Reset current state and start from the initial state In the Decodable CLI, do one of the following: Use connection activate and add the --force flag, for example: decodable connection activate cef0e708 --force or Use query with a suitable specifier for the connection (such as --name) and add the --operation reset-state argument, for example: decodable query --name customers-source --operation reset-state Postgres replication slots This connector will create a replication slot in the Postgres database upon creation, named decodable_<connection-id>. This slot tracks how far the Write Ahead Log (WAL) has been consumed by the connection at a given point. Starting from Postgres 16, this connector can also connect to a replication slot on a read replica. Upon the first start-up, the Decodable Postgres CDC connector creates a publication for ALL TABLES. Alternatively, you can create this publication yourself prior to starting the connection. You might do this if you want to publish just a specific set of tables. To do so, create a publication with the name dbz_publication. The connector currently doesn’t delete its replication slot when it’s deleted in Decodable itself. This causes unbounded WAL growth in the source Postgres database, as that replication slot will never be consumed again. Therefore if you delete this connection you should also manually delete the associated replication slot. To do so, run the following command in a Postgres session: SELECT pg_drop_replication_slot(<slot_name>) FROM pg_replication_slots; Data types mapping The following table shows the Decodable data types that are generated from the corresponding Postgres data types. Postgres Type Decodable Type INT2 SMALLINT SMALLSERIAL SERIAL2 SMALLINT INT INT4 INTEGER SERIAL INT INT8 BIGINT BIGSERIAL BIGINT FLOAT4 REAL FLOAT FLOAT8 DOUBLE DECIMAL(p, s) DEC(p, s) NUMERIC(p, s) DECIMAL(p, s) BOOL BOOLEAN BOOLEAN DATE DATE TIME(p) TIMETZ(p) TIME(p) TIME(6) if no precision provided TIMESTAMP(p) TIMESTAMP(p) TIMESTAMP(9) if no precision provided TIMESTAMPTZ(p) TIMESTAMP_LTZ(p) TIMESTAMP_LTZ(9) if no precision provided BPCHAR CHAR(2147483647) BPCHAR(n) CHAR(n) CHARACTER(n) NCHAR(n) CHAR(n) VARCHAR(n) VARCHAR(n) JSON JSONB TEXT XML STRING UUID CHAR(36) BIT BYTEA VARBIT BYTES ACLITEM BOX CID CIDR CIRCLE GEOMETRY GTSVECTOR INET INT2VECTOR INTERVAL JSONPATH LINE LSEG MACADDR MACADDR8 MONEY NAME OID OIDVECTOR PATH POINT POLYGON REFCURSOR TID TSQUERY TSVECTOR TXID_SNAPSHOT XID XID8 Not supported