Skip to main content

CDC for PostgreSQL

PostgreSQL CDC integrations capture row-level changes from PostgreSQL tables in real time using Debezium-based Change Data Capture. Use them for data synchronization, audit logging, and event-driven workflows that must react to database inserts, updates, deletes, and truncates without polling.

note

Logical replication must be enabled on the PostgreSQL database and on the specific tables you want to track before creating this integration. See Prerequisites.

Prerequisites

Before creating the integration:

  • wal_level = logical must be set in postgresql.conf, and the server restarted. Without this, the connector cannot read change events from the write-ahead log.
  • The connecting user needs the REPLICATION attribute. Verify with SELECT rolname, rolreplication FROM pg_roles WHERE rolname = '<username>';.
  • Each tracked table needs REPLICA IDENTITY FULL so that before images are emitted for updates and deletes.
-- Set in postgresql.conf
wal_level = logical

-- Grant replication permission to the user
ALTER USER <username> REPLICATION;

-- Enable full replica identity on the table
ALTER TABLE <schema>.<table> REPLICA IDENTITY FULL;

The connector uses a logical replication slot (default name debezium) and a publication (default name dbz_publication); both are created automatically on first run. For advanced Debezium properties, see the Debezium connector for PostgreSQL.

Create a CDC service for PostgreSQL

  1. Click + Add Artifact in the canvas or click + next to Entry Points in the sidebar.

  2. In the Artifacts panel, select CDC for PostgreSQL under Event Integration.

  3. In the creation form, select Create new to configure a new listener.

    PostgreSQL CDC creation form: connection fields

    Under Listener Configurations, fill in the following fields:

    FieldDescriptionDefault
    HostHostname of the PostgreSQL server.localhost
    PortPort number of the PostgreSQL server.5432
    UsernameUsername for the PostgreSQL connection.Required
    PasswordPassword for the PostgreSQL connection.Required
    DatabaseName of the database to capture changes from.Required
    SchemasRegular expressions matching schema names to capture changes from. Click + Add Item to add each pattern.

    Expand Advanced Configurations for additional settings:

    FieldDescriptionDefault
    Listener NameIdentifier for the listener created with this service.postgresqlCdcListener
    Secure SocketSSL/TLS configuration for a secure connection.
    OptionsAdditional options for the CDC engine as a record expression. Common keys include snapshotMode (for example, cdc:NO_DATA to skip the initial snapshot) and skippedOperations (for example, [cdc:TRUNCATE, cdc:UPDATE, cdc:DELETE] to skip truncate, update, and delete events; note that snapshot reads still trigger onRead unless snapshotMode is also set to cdc:NO_DATA).

    Under Table, enter the fully qualified table name to capture events from in the format <database>.<schema>.<table> (for example, mydb.public.customers).

  4. Click Create.

  5. WSO2 Integrator opens the service in the Service Designer. The canvas shows the attached listener pill and the table name pill.

  6. Click + Add Handler to add event handlers.

    Service Designer showing the PostgreSQL CDC service canvas

Service configuration

In the Service Designer, click the Configure icon in the header to open the CDC for PostgreSQL Configuration panel. Select CDC for PostgreSQL in the left panel.

PostgreSQL CDC Configuration panel: service config and listener connection

FieldDescription
Service ConfigAdvanced CDC configuration as a record expression. The tables field sets the fully qualified table name (format: <database>.<schema>.<table>).

Listener configuration

In the CDC for PostgreSQL Configuration panel, select postgresqlCdcListener under Attached Listeners to configure the listener.

Listener configuration: Database, Engine Name, Internal Schema Storage, Offset Storage, Liveness Interval, Options

FieldDescriptionDefault
NameIdentifier for the listener.postgresqlCdcListener
DatabaseDatabase connection configuration as a record expression with hostname, port, username, password, and databaseName fields.Required
Engine NameDebezium engine instance name.ballerina-cdc-connector
Internal Schema StorageSchema history storage configuration.{fileName: "tmp/dbhistory.dat"}
Offset StorageOffset storage configuration for tracking CDC progress.{fileName: "tmp/debezium-offsets.dat"}
Liveness IntervalInterval in seconds for checking CDC listener liveness.60.0
OptionsAdditional connector options as a record expression.{}

Click + Attach Listener to attach an additional listener to the same service.

Click Save Changes to apply updates.

Replication slot and publication

The connector uses a logical replication slot and a publication on the source database. The defaults (debezium and dbz_publication) work for most cases. To change them or to control auto-creation, set replicationConfig and publicationConfig inside the Database field.

Event handlers

Adding an event handler

In the Service Designer, click + Add Handler. The Select Handler to Add panel lists onRead, onCreate, onUpdate, onDelete, onTruncate, and onError.

onRead, onCreate, onUpdate, and onDelete each open a configuration panel with a + Define Database Entry option to define the expected record type for the change event. Expand Advanced Parameters to find the TableName checkbox, which scopes the handler to a specific table. Click Save to add the handler.

onTruncate and onError are added directly without additional configuration.

onRead/onCreate/onUpdate/onDelete handler configuration panel

Truncate events

By default, TRUNCATE operations are in the skippedOperations list, so onTruncate is not invoked. To receive truncate events, remove cdc:TRUNCATE from skippedOperations (for example, set it to []) in Options.

Handler types

HandlerTriggered whenUse when
onReadA row is read during the initial snapshot of the tableBootstrapping downstream systems with existing data
onCreateA row is inserted into the tracked tableSyncing new records to downstream systems
onUpdateA row is updated in the tracked tablePropagating field changes
onDeleteA row is deleted from the tracked tableRemoving records from downstream systems
onTruncateThe tracked table is truncated (PostgreSQL only; skipped by default)Clearing or resetting downstream data
onErrorA CDC processing error occursLogging failures and sending alerts

What's next