Skip to main content

CDC for PostgreSQL

PostgreSQL CDC integrations capture row-level changes from PostgreSQL tables in real time using Debezium-based Change Data Capture. Use them for data synchronization, audit logging, and event-driven workflows that must react to database inserts, updates, deletes, and truncates without polling. This page covers creating the integration, configuring the service and listener, and adding event handlers for insert, update, delete, truncate and read events.

Prerequisites

Logical replication must be enabled on the PostgreSQL database and on each table you want to track before creating this integration. See the CDC connector setup guide for step-by-step instructions.

Create a CDC service for PostgreSQL

  1. Click + Add Artifact in the canvas or click + next to Entry Points in the sidebar.

  2. In the Artifacts panel, select CDC for PostgreSQL under Event Integration.

  3. In the creation form, select Create new to configure a new listener.

    PostgreSQL CDC creation form: connection fields

    Under Listener Configurations, fill in the following fields:

    FieldDescriptionDefault
    HostHostname of the PostgreSQL server.localhost
    PortPort number of the PostgreSQL server.5432
    UsernameUsername for the PostgreSQL connection.Required
    PasswordPassword for the PostgreSQL connection.Required
    DatabaseName of the database to capture changes from.Required
    SchemasRegular expressions matching schema names to capture changes from. Click + Add Item to add each pattern.

    Expand Advanced Configurations for additional settings:

    FieldDescriptionDefault
    Listener NameIdentifier for the listener created with this service.postgresqlCdcListener
    Secure SocketSSL/TLS configuration for a secure connection.
    OptionsAdditional options for the CDC engine as a record expression. Common keys include snapshotMode (for example, cdc:NO_DATA to skip the initial snapshot) and skippedOperations (for example, [cdc:TRUNCATE, cdc:UPDATE, cdc:DELETE] to skip truncate, update, and delete events; note that snapshot reads still trigger onRead unless snapshotMode is also set to cdc:NO_DATA).

    Under Table, enter the fully qualified table name to capture events from in the format <database>.<schema>.<table> (for example, mydb.public.customers).

  4. Click Create.

  5. WSO2 Integrator creates the empty service and opens it in the Service Designer. The canvas shows the attached listener pill and the table name pill. The service has no handlers yet.

    Service Designer showing the PostgreSQL CDC service canvas

Service configuration

In the Service Designer, click the Configure icon in the header to open the CDC for PostgreSQL Configuration panel. Select CDC for PostgreSQL in the left panel.

PostgreSQL CDC Configuration panel: service config and listener connection

FieldDescription
Service ConfigAdvanced CDC configuration as a record expression. The tables field sets the table(s) to capture changes from, using the fully qualified format <database>.<schema>.<table>. Provide a single table as a string (for example, "mydb.public.customers"), or multiple tables as a string array (for example, ["mydb.public.customers", "mydb.public.orders"]).

Listener configuration

In the CDC for PostgreSQL Configuration panel, select postgresqlCdcListener under Attached Listeners to configure the listener.

A single service can be attached to more than one listener. Attach multiple listeners when one service needs to process change events from more than one PostgreSQL source. For example, you can capture changes from two separate PostgreSQL instances, or from two databases with different connection settings, and route every event through the same handler logic.

Listener configuration: Database, Engine Name, Internal Schema Storage, Offset Storage, Liveness Interval, Options

FieldDescriptionDefault
NameIdentifier for the listener.postgresqlCdcListener
DatabaseDatabase connection configuration as a record expression with hostname, port, username, password, and databaseName fields.Required
Engine NameDebezium engine instance name.ballerina-cdc-connector
Internal Schema StorageSchema history storage configuration.{fileName: "tmp/dbhistory.dat"}
Offset StorageOffset storage configuration for tracking CDC progress.{fileName: "tmp/debezium-offsets.dat"}
Liveness IntervalInterval in seconds for checking CDC listener liveness.60.0
OptionsAdditional connector options as a record expression.{}

Click + Attach Listener to attach an additional listener to the same service.

Click Save Changes to apply updates.

Replication slot and publication

The connector uses a logical replication slot and a publication on the source database. The defaults (debezium and dbz_publication) work for most cases. To change them or to control auto-creation, set replicationConfig and publicationConfig inside the Database field.

Event handlers

Adding an event handler

In the Service Designer, click + Add Handler. The Select Handler to Add panel lists onRead, onCreate, onUpdate, onDelete, onTruncate, and onError.

onRead, onCreate, onUpdate, and onDelete each open a Message Handler Configuration panel for the row payload. onTruncate and onError are added directly without additional configuration.

Message Handler Configuration panel with Define Database Entry and Advanced Parameters TableName checkbox

The configuration panel exposes the following fields:

FieldDescription
+ Define Database EntryDefines the record type representing one row of the tracked table. The handler receives this record at runtime with values from the change event.
Advanced Parameters > TableNameScopes the handler to a specific table. This is selected by default so that the handler only runs for changes on the table it was added for. Clear the checkbox if you want the handler to run for changes on every table the service is attached to.

Click Save to add the handler.

Truncate events

By default, TRUNCATE operations are in the skippedOperations list, so onTruncate is not invoked. To receive truncate events, remove cdc:TRUNCATE from skippedOperations (for example, set it to []) in Options.

Handler types

HandlerTriggered whenUse when
onReadA row is read during the initial snapshot of the tableBootstrapping downstream systems with existing data
onCreateA row is inserted into the tracked tableSyncing new records to downstream systems
onUpdateA row is updated in the tracked tablePropagating field changes
onDeleteA row is deleted from the tracked tableRemoving records from downstream systems
onTruncateThe tracked table is truncated (PostgreSQL only; skipped by default)Clearing or resetting downstream data
onErrorA CDC processing error occursLogging failures and sending alerts

What's next