Skip to main content

Triggers

The ballerinax/postgresql connector supports event-driven integration through Change Data Capture (CDC) powered by Debezium. When rows are inserted, updated, deleted, or read during the initial snapshot in monitored PostgreSQL tables, the postgresql:CdcListener receives change events in real time and invokes your service callbacks automatically.

Three components work together:

ComponentRole
postgresql:CdcListenerConnects to PostgreSQL's logical replication stream through Debezium and streams row-level change events to attached services.
cdc:ServiceDefines the onRead, onCreate, onUpdate, and onDelete callbacks invoked per change event.
PostgresDatabaseConnectionConfiguration record for the PostgreSQL CDC database connection (host, port, credentials, schema/table filters, logical decoding plugin).

For action-based record operations, see the Action Reference.


Listener

The postgresql:CdcListener establishes the connection and manages event subscriptions.

Configuration

The listener supports the following connection strategies:

Config TypeDescription
PostgresDatabaseConnectionConfigures the CDC database connection including server address, credentials, and schema or table filtering.
PostgresListenerConfigurationTop-level listener configuration wrapping the database connection and CDC options.

PostgresDatabaseConnection fields:

FieldTypeDefaultDescription
connectorClassstring"io.debezium.connector.postgresql.PostgresConnector"The Debezium PostgreSQL connector class name.
hostnamestring"localhost"The hostname of the PostgreSQL server.
portint5432The port number of the PostgreSQL server.
usernamestringRequiredPostgreSQL username with the REPLICATION privilege. Inherited from cdc:DatabaseConnection.
passwordstringRequiredPostgreSQL password for the specified user. Inherited from cdc:DatabaseConnection.
databaseNamestringRequiredName of the database to capture changes from.
connectTimeoutdecimal?()Connection timeout in seconds. Inherited from cdc:DatabaseConnection.
securecdc:SecureDatabaseConnection?()SSL/TLS configuration for the database connection. Inherited from cdc:DatabaseConnection.
includedSchemasstring|string[]?()Regex patterns of schemas to include in capture. Mutually exclusive with excludedSchemas.
excludedSchemasstring|string[]?()Regex patterns of schemas to exclude from capture. Mutually exclusive with includedSchemas.
includedTablesstring|string[]?()Fully-qualified table names or regex patterns to capture (for example, "public.customers"). Mutually exclusive with excludedTables.
excludedTablesstring|string[]?()Regex patterns of tables to exclude. Mutually exclusive with includedTables.
includedColumnsstring|string[]?()Regex patterns of columns to capture. Mutually exclusive with excludedColumns.
excludedColumnsstring|string[]?()Regex patterns of columns to exclude. Mutually exclusive with includedColumns.
messageKeyColumnscdc:MessageKeyColumns?()Composite message-key column mappings for change events.
tasksMaxint1Maximum number of tasks. The PostgreSQL connector always uses a single task, so changing this has no effect.
pluginNamePostgreSQLLogicalDecodingPluginPGOUTPUTLogical decoding plugin to use (PGOUTPUT or DECODERBUFS). Deprecated: use replicationConfig.pluginName instead.
slotNamestring"debezium"Name of the logical replication slot. Deprecated: use replicationConfig.slotName instead.
publicationNamestring"dbz_publication"Name of the PostgreSQL publication for the PGOUTPUT plugin. Deprecated: use publicationConfig.publicationName instead.
replicationConfigReplicationConfiguration?()Logical decoding plugin, slot name, and slot parameters. Takes priority over the deprecated top-level fields.
publicationConfigPublicationConfiguration?()Publication name and autocreate mode. Takes priority over the deprecated top-level publicationName.
streamingConfigStreamingConfiguration?()Streaming and status-update configuration (status update interval, xmin fetch interval, LSN flush mode).

PostgresListenerConfiguration fields:

FieldTypeDefaultDescription
databasePostgresDatabaseConnectionRequiredThe PostgreSQL CDC database connection configuration.
engineNamestring"ballerina-cdc-connector"Debezium engine instance name. Inherited from cdc:ListenerConfiguration.
internalSchemaStoragecdc:InternalSchemaStorage{fileName: "tmp/dbhistory.dat"}Schema-history storage configuration (file, Kafka, JDBC, Redis, S3, Azure Blob, RocketMQ, or in-memory). Inherited from cdc:ListenerConfiguration.
offsetStoragecdc:OffsetStorage{fileName: "tmp/debezium-offsets.dat"}Offset storage configuration (file, Kafka, JDBC, Redis, or in-memory). Inherited from cdc:ListenerConfiguration.
livenessIntervaldecimal60.0Interval, in seconds, for checking CDC listener liveness. Inherited from cdc:ListenerConfiguration.
optionsPostgreSqlOptions{}PostgreSQL-specific CDC options including snapshotMode, skippedOperations, extended snapshot, data type handling, and heartbeat configs.

Initializing the listener

Basic CDC listener with default settings:

import ballerinax/postgresql;
import ballerinax/postgresql.driver as _;

configurable string username = ?;
configurable string password = ?;
configurable string database = ?;

listener postgresql:CdcListener cdcListener = new (database = {
username: username,
password: password,
databaseName: database
});

CDC listener with schema and table filters:

import ballerinax/cdc;
import ballerinax/postgresql;
import ballerinax/postgresql.driver as _;

configurable string username = ?;
configurable string password = ?;
configurable string database = ?;

listener postgresql:CdcListener cdcListener = new (
database = {
username: username,
password: password,
databaseName: database,
hostname: "db.example.com",
port: 5432,
includedSchemas: "public",
includedTables: ["public.customers", "public.orders"],
pluginName: postgresql:PGOUTPUT,
slotName: "my_slot"
},
options = {
snapshotMode: cdc:NO_DATA,
skippedOperations: [cdc:TRUNCATE]
}
);

Service

A cdc:Service is a Ballerina service attached to a postgresql:CdcListener. It listens for row-level change events on monitored PostgreSQL tables and implements callbacks for each event type. You can type the callback parameters with your own Ballerina record types for automatic mapping.

Callback signatures

FunctionSignatureDescription
onReadremote function onRead(record {} after) returns cdc:Error?Invoked during the initial snapshot for each existing row read from the database.
onCreateremote function onCreate(record {} after) returns cdc:Error?Invoked when a new row is inserted into a monitored table.
onUpdateremote function onUpdate(record {} before, record {} after) returns cdc:Error?Invoked when a row is updated, providing both the before and after state.
onDeleteremote function onDelete(record {} before) returns cdc:Error?Invoked when a row is deleted, providing the row state before deletion.
note

You do not need to implement all of these callbacks. Only implement the event types relevant to your use case.

Full usage example

import ballerina/log;
import ballerinax/cdc;
import ballerinax/postgresql;
import ballerinax/postgresql.driver as _;

configurable string username = ?;
configurable string password = ?;
configurable string database = ?;

type Order record {|
int order_id;
int customer_id;
decimal amount;
string status;
|};

listener postgresql:CdcListener cdcListener = new (
database = {
username: username,
password: password,
databaseName: database,
includedSchemas: "public",
includedTables: "public.orders"
},
options = {
snapshotMode: cdc:NO_DATA
}
);

service cdc:Service on cdcListener {
isolated remote function onRead(Order after) returns cdc:Error? {
log:printInfo("Snapshot row", order = after.toString());
}

isolated remote function onCreate(Order after) returns cdc:Error? {
log:printInfo("New order created",
orderId = after.order_id,
amount = after.amount
);
}

isolated remote function onUpdate(Order before, Order after) returns cdc:Error? {
log:printInfo("Order updated",
orderId = after.order_id,
oldStatus = before.status,
newStatus = after.status
);
}

isolated remote function onDelete(Order before) returns cdc:Error? {
log:printInfo("Order deleted", orderId = before.order_id);
}
}
note

For CDC to work, PostgreSQL must be configured with wal_level = logical, and the connecting user must have the REPLICATION privilege. The pgoutput plugin is included by default in PostgreSQL 10 and later.


Supporting types

For the PostgresDatabaseConnection field reference, see the Listener > Configuration section above.

PostgreSQLLogicalDecodingPlugin

Logical decoding plugin used by Debezium to read PostgreSQL's WAL.

ConstantValueDescription
PGOUTPUT"pgoutput"Standard PostgreSQL logical decoding plugin. Included by default in PostgreSQL 10 and later.
DECODERBUFS"decoderbufs"Protobuf-based logical decoding plugin from the Debezium community. Must be installed separately.

ReplicationConfiguration

Replaces the deprecated top-level pluginName and slotName fields on PostgresDatabaseConnection. When set, takes priority over those fields.

FieldTypeDefaultDescription
pluginNamePostgreSQLLogicalDecodingPluginPGOUTPUTLogical decoding plugin to use.
slotNamestring"debezium"Name of the PostgreSQL logical replication slot.
slotDropOnStopbooleanfalseDrop the replication slot when the connector stops.
slotStreamParamsstring?()Custom replication slot stream parameters.

PublicationConfiguration

Replaces the deprecated top-level publicationName field on PostgresDatabaseConnection. When set, takes priority over that field. Applies when using the PGOUTPUT plugin.

FieldTypeDefaultDescription
publicationNamestring"dbz_publication"Name of the PostgreSQL publication used for streaming changes.
publicationAutocreateModePublicationAutocreateModeALL_TABLESWhether and how Debezium auto-creates the publication.

StreamingConfiguration

FieldTypeDefaultDescription
statusUpdateIntervaldecimal10Interval, in seconds, for sending status updates to PostgreSQL.
xminFetchIntervaldecimal0Interval, in seconds, for fetching the current xmin position. 0 disables periodic fetching.
lsnFlushModeLsnFlushMode?()LSN flushing strategy.

PublicationAutocreateMode

ConstantValueDescription
ALL_TABLES"all_tables"Auto-create a publication for all tables.
DISABLED"disabled"Do not auto-create publications. Requires manual setup in PostgreSQL.
FILTERED"filtered"Auto-create a publication restricted to the tables in the include/exclude filters.

LsnFlushMode

ConstantValueDescription
MANUAL"manual"The user controls when LSN positions are flushed.
CONNECTOR"connector"The connector flushes LSN positions periodically.
CONNECTOR_AND_DRIVER"connector_and_driver"Both the connector and the JDBC driver flush LSN positions.

PostgreSqlOptions

PostgreSQL-specific CDC options. Set on PostgresListenerConfiguration.options. Includes all fields of cdc:Options plus PostgreSQL-specific configuration.

FieldTypeDefaultDescription
snapshotModecdc:SnapshotModeINITIALInitial snapshot behavior. Values: ALWAYS, INITIAL, INITIAL_ONLY, SCHEMA_ONLY, NO_DATA, RECOVERY, WHEN_NEEDED, CONFIGURATION_BASED, CUSTOM.
eventProcessingFailureHandlingModecdc:EventProcessingFailureHandlingModeWARNHow to handle event-processing failures (FAIL, WARN, SKIP).
skippedOperationscdc:Operation[TRUNCATE]Operations to skip when publishing change events.
skipMessagesWithoutChangebooleanfalseDiscard events that contain no row-data changes.
decimalHandlingModecdc:DecimalHandlingModeDOUBLERepresentation mode for decimal values (PRECISE, DOUBLE, STRING).
maxQueueSizeint8192Maximum number of events in the internal queue.
maxBatchSizeint2048Maximum number of events per processing batch.
queryTimeoutdecimal60Database query timeout in seconds. 0 disables the timeout.
heartbeatConfigcdc:HeartbeatConfiguration?()Heartbeat configuration for keeping the replication connection active. Inherited from cdc:Options.
signalConfigcdc:SignalConfiguration?()Signal-channel configuration for ad-hoc control. Inherited from cdc:Options.
transactionMetadataConfigcdc:TransactionMetadataConfiguration?()Transaction-boundary event configuration. Inherited from cdc:Options.
columnTransformConfigcdc:ColumnTransformConfiguration?()Column masking and transformation configuration. Inherited from cdc:Options.
topicConfigcdc:TopicConfiguration?()Topic naming and routing configuration. Inherited from cdc:Options.
connectionRetryConfigcdc:ConnectionRetryConfiguration?()Connection retry behavior. Inherited from cdc:Options.
performanceConfigcdc:PerformanceConfiguration?()Performance-tuning configuration. Inherited from cdc:Options.
extendedSnapshotExtendedSnapshotConfiguration?()PostgreSQL extended snapshot configuration (lock timeout, isolation mode). Narrows the parent cdc:Options.extendedSnapshot.
dataTypeConfigcdc:DataTypeConfiguration?()Data-type handling configuration including schema-change tracking.

ExtendedSnapshotConfiguration

PostgreSQL-specific extension of cdc:RelationalExtendedSnapshotConfiguration.

FieldTypeDefaultDescription
lockTimeoutdecimal10Lock acquisition timeout in seconds during snapshot.
isolationModecdc:SnapshotIsolationMode?()Transaction isolation level used during the snapshot.

cdc:SecureDatabaseConnection

SSL/TLS configuration for the CDC database connection. Set on PostgresDatabaseConnection.secure.

FieldTypeDefaultDescription
sslModecdc:SslModePREFERREDConnection security level (DISABLED, PREFERRED, REQUIRED, VERIFY_CA, VERIFY_IDENTITY).
keyStorecrypto:KeyStore?()Client keystore for mutual TLS authentication.
trustStorecrypto:TrustStore?()Truststore for verifying the server certificate.

cdc:MessageKeyColumns

Defines a composite message key for a captured table.

FieldTypeDefaultDescription
tableNamestringRequiredFully-qualified table name (for example, "public.orders").
columnsstring[]RequiredColumn names that compose the message key.