Triggers
The ballerinax/postgresql connector supports event-driven integration through Change Data Capture (CDC) powered by Debezium. When rows are inserted, updated, deleted, or read during the initial snapshot in monitored PostgreSQL tables, the postgresql:CdcListener receives change events in real time and invokes your service callbacks automatically.
Three components work together:
| Component | Role |
|---|---|
postgresql:CdcListener | Connects to PostgreSQL's logical replication stream through Debezium and streams row-level change events to attached services. |
cdc:Service | Defines the onRead, onCreate, onUpdate, and onDelete callbacks invoked per change event. |
PostgresDatabaseConnection | Configuration record for the PostgreSQL CDC database connection (host, port, credentials, schema/table filters, logical decoding plugin). |
For action-based record operations, see the Action Reference.
Listener
The postgresql:CdcListener establishes the connection and manages event subscriptions.
Configuration
The listener supports the following connection strategies:
| Config Type | Description |
|---|---|
PostgresDatabaseConnection | Configures the CDC database connection including server address, credentials, and schema or table filtering. |
PostgresListenerConfiguration | Top-level listener configuration wrapping the database connection and CDC options. |
PostgresDatabaseConnection fields:
| Field | Type | Default | Description |
|---|---|---|---|
connectorClass | string | "io.debezium.connector.postgresql.PostgresConnector" | The Debezium PostgreSQL connector class name. |
hostname | string | "localhost" | The hostname of the PostgreSQL server. |
port | int | 5432 | The port number of the PostgreSQL server. |
username | string | Required | PostgreSQL username with the REPLICATION privilege. Inherited from cdc:DatabaseConnection. |
password | string | Required | PostgreSQL password for the specified user. Inherited from cdc:DatabaseConnection. |
databaseName | string | Required | Name of the database to capture changes from. |
connectTimeout | decimal? | () | Connection timeout in seconds. Inherited from cdc:DatabaseConnection. |
secure | cdc:SecureDatabaseConnection? | () | SSL/TLS configuration for the database connection. Inherited from cdc:DatabaseConnection. |
includedSchemas | string|string[]? | () | Regex patterns of schemas to include in capture. Mutually exclusive with excludedSchemas. |
excludedSchemas | string|string[]? | () | Regex patterns of schemas to exclude from capture. Mutually exclusive with includedSchemas. |
includedTables | string|string[]? | () | Fully-qualified table names or regex patterns to capture (for example, "public.customers"). Mutually exclusive with excludedTables. |
excludedTables | string|string[]? | () | Regex patterns of tables to exclude. Mutually exclusive with includedTables. |
includedColumns | string|string[]? | () | Regex patterns of columns to capture. Mutually exclusive with excludedColumns. |
excludedColumns | string|string[]? | () | Regex patterns of columns to exclude. Mutually exclusive with includedColumns. |
messageKeyColumns | cdc:MessageKeyColumns? | () | Composite message-key column mappings for change events. |
tasksMax | int | 1 | Maximum number of tasks. The PostgreSQL connector always uses a single task, so changing this has no effect. |
pluginName | PostgreSQLLogicalDecodingPlugin | PGOUTPUT | Logical decoding plugin to use (PGOUTPUT or DECODERBUFS). Deprecated: use replicationConfig.pluginName instead. |
slotName | string | "debezium" | Name of the logical replication slot. Deprecated: use replicationConfig.slotName instead. |
publicationName | string | "dbz_publication" | Name of the PostgreSQL publication for the PGOUTPUT plugin. Deprecated: use publicationConfig.publicationName instead. |
replicationConfig | ReplicationConfiguration? | () | Logical decoding plugin, slot name, and slot parameters. Takes priority over the deprecated top-level fields. |
publicationConfig | PublicationConfiguration? | () | Publication name and autocreate mode. Takes priority over the deprecated top-level publicationName. |
streamingConfig | StreamingConfiguration? | () | Streaming and status-update configuration (status update interval, xmin fetch interval, LSN flush mode). |
PostgresListenerConfiguration fields:
| Field | Type | Default | Description |
|---|---|---|---|
database | PostgresDatabaseConnection | Required | The PostgreSQL CDC database connection configuration. |
engineName | string | "ballerina-cdc-connector" | Debezium engine instance name. Inherited from cdc:ListenerConfiguration. |
internalSchemaStorage | cdc:InternalSchemaStorage | {fileName: "tmp/dbhistory.dat"} | Schema-history storage configuration (file, Kafka, JDBC, Redis, S3, Azure Blob, RocketMQ, or in-memory). Inherited from cdc:ListenerConfiguration. |
offsetStorage | cdc:OffsetStorage | {fileName: "tmp/debezium-offsets.dat"} | Offset storage configuration (file, Kafka, JDBC, Redis, or in-memory). Inherited from cdc:ListenerConfiguration. |
livenessInterval | decimal | 60.0 | Interval, in seconds, for checking CDC listener liveness. Inherited from cdc:ListenerConfiguration. |
options | PostgreSqlOptions | {} | PostgreSQL-specific CDC options including snapshotMode, skippedOperations, extended snapshot, data type handling, and heartbeat configs. |
Initializing the listener
Basic CDC listener with default settings:
import ballerinax/postgresql;
import ballerinax/postgresql.driver as _;
configurable string username = ?;
configurable string password = ?;
configurable string database = ?;
listener postgresql:CdcListener cdcListener = new (database = {
username: username,
password: password,
databaseName: database
});
CDC listener with schema and table filters:
import ballerinax/cdc;
import ballerinax/postgresql;
import ballerinax/postgresql.driver as _;
configurable string username = ?;
configurable string password = ?;
configurable string database = ?;
listener postgresql:CdcListener cdcListener = new (
database = {
username: username,
password: password,
databaseName: database,
hostname: "db.example.com",
port: 5432,
includedSchemas: "public",
includedTables: ["public.customers", "public.orders"],
pluginName: postgresql:PGOUTPUT,
slotName: "my_slot"
},
options = {
snapshotMode: cdc:NO_DATA,
skippedOperations: [cdc:TRUNCATE]
}
);
Service
A cdc:Service is a Ballerina service attached to a postgresql:CdcListener. It listens for row-level change events on monitored PostgreSQL tables and implements callbacks for each event type. You can type the callback parameters with your own Ballerina record types for automatic mapping.
Callback signatures
| Function | Signature | Description |
|---|---|---|
onRead | remote function onRead(record {} after) returns cdc:Error? | Invoked during the initial snapshot for each existing row read from the database. |
onCreate | remote function onCreate(record {} after) returns cdc:Error? | Invoked when a new row is inserted into a monitored table. |
onUpdate | remote function onUpdate(record {} before, record {} after) returns cdc:Error? | Invoked when a row is updated, providing both the before and after state. |
onDelete | remote function onDelete(record {} before) returns cdc:Error? | Invoked when a row is deleted, providing the row state before deletion. |
You do not need to implement all of these callbacks. Only implement the event types relevant to your use case.
Full usage example
import ballerina/log;
import ballerinax/cdc;
import ballerinax/postgresql;
import ballerinax/postgresql.driver as _;
configurable string username = ?;
configurable string password = ?;
configurable string database = ?;
type Order record {|
int order_id;
int customer_id;
decimal amount;
string status;
|};
listener postgresql:CdcListener cdcListener = new (
database = {
username: username,
password: password,
databaseName: database,
includedSchemas: "public",
includedTables: "public.orders"
},
options = {
snapshotMode: cdc:NO_DATA
}
);
service cdc:Service on cdcListener {
isolated remote function onRead(Order after) returns cdc:Error? {
log:printInfo("Snapshot row", order = after.toString());
}
isolated remote function onCreate(Order after) returns cdc:Error? {
log:printInfo("New order created",
orderId = after.order_id,
amount = after.amount
);
}
isolated remote function onUpdate(Order before, Order after) returns cdc:Error? {
log:printInfo("Order updated",
orderId = after.order_id,
oldStatus = before.status,
newStatus = after.status
);
}
isolated remote function onDelete(Order before) returns cdc:Error? {
log:printInfo("Order deleted", orderId = before.order_id);
}
}
For CDC to work, PostgreSQL must be configured with wal_level = logical, and the connecting user must have the REPLICATION privilege. The pgoutput plugin is included by default in PostgreSQL 10 and later.
Supporting types
For the PostgresDatabaseConnection field reference, see the Listener > Configuration section above.
PostgreSQLLogicalDecodingPlugin
Logical decoding plugin used by Debezium to read PostgreSQL's WAL.
| Constant | Value | Description |
|---|---|---|
PGOUTPUT | "pgoutput" | Standard PostgreSQL logical decoding plugin. Included by default in PostgreSQL 10 and later. |
DECODERBUFS | "decoderbufs" | Protobuf-based logical decoding plugin from the Debezium community. Must be installed separately. |
ReplicationConfiguration
Replaces the deprecated top-level pluginName and slotName fields on PostgresDatabaseConnection. When set, takes priority over those fields.
| Field | Type | Default | Description |
|---|---|---|---|
pluginName | PostgreSQLLogicalDecodingPlugin | PGOUTPUT | Logical decoding plugin to use. |
slotName | string | "debezium" | Name of the PostgreSQL logical replication slot. |
slotDropOnStop | boolean | false | Drop the replication slot when the connector stops. |
slotStreamParams | string? | () | Custom replication slot stream parameters. |
PublicationConfiguration
Replaces the deprecated top-level publicationName field on PostgresDatabaseConnection. When set, takes priority over that field. Applies when using the PGOUTPUT plugin.
| Field | Type | Default | Description |
|---|---|---|---|
publicationName | string | "dbz_publication" | Name of the PostgreSQL publication used for streaming changes. |
publicationAutocreateMode | PublicationAutocreateMode | ALL_TABLES | Whether and how Debezium auto-creates the publication. |
StreamingConfiguration
| Field | Type | Default | Description |
|---|---|---|---|
statusUpdateInterval | decimal | 10 | Interval, in seconds, for sending status updates to PostgreSQL. |
xminFetchInterval | decimal | 0 | Interval, in seconds, for fetching the current xmin position. 0 disables periodic fetching. |
lsnFlushMode | LsnFlushMode? | () | LSN flushing strategy. |
PublicationAutocreateMode
| Constant | Value | Description |
|---|---|---|
ALL_TABLES | "all_tables" | Auto-create a publication for all tables. |
DISABLED | "disabled" | Do not auto-create publications. Requires manual setup in PostgreSQL. |
FILTERED | "filtered" | Auto-create a publication restricted to the tables in the include/exclude filters. |
LsnFlushMode
| Constant | Value | Description |
|---|---|---|
MANUAL | "manual" | The user controls when LSN positions are flushed. |
CONNECTOR | "connector" | The connector flushes LSN positions periodically. |
CONNECTOR_AND_DRIVER | "connector_and_driver" | Both the connector and the JDBC driver flush LSN positions. |
PostgreSqlOptions
PostgreSQL-specific CDC options. Set on PostgresListenerConfiguration.options. Includes all fields of cdc:Options plus PostgreSQL-specific configuration.
| Field | Type | Default | Description |
|---|---|---|---|
snapshotMode | cdc:SnapshotMode | INITIAL | Initial snapshot behavior. Values: ALWAYS, INITIAL, INITIAL_ONLY, SCHEMA_ONLY, NO_DATA, RECOVERY, WHEN_NEEDED, CONFIGURATION_BASED, CUSTOM. |
eventProcessingFailureHandlingMode | cdc:EventProcessingFailureHandlingMode | WARN | How to handle event-processing failures (FAIL, WARN, SKIP). |
skippedOperations | cdc:Operation | [TRUNCATE] | Operations to skip when publishing change events. |
skipMessagesWithoutChange | boolean | false | Discard events that contain no row-data changes. |
decimalHandlingMode | cdc:DecimalHandlingMode | DOUBLE | Representation mode for decimal values (PRECISE, DOUBLE, STRING). |
maxQueueSize | int | 8192 | Maximum number of events in the internal queue. |
maxBatchSize | int | 2048 | Maximum number of events per processing batch. |
queryTimeout | decimal | 60 | Database query timeout in seconds. 0 disables the timeout. |
heartbeatConfig | cdc:HeartbeatConfiguration? | () | Heartbeat configuration for keeping the replication connection active. Inherited from cdc:Options. |
signalConfig | cdc:SignalConfiguration? | () | Signal-channel configuration for ad-hoc control. Inherited from cdc:Options. |
transactionMetadataConfig | cdc:TransactionMetadataConfiguration? | () | Transaction-boundary event configuration. Inherited from cdc:Options. |
columnTransformConfig | cdc:ColumnTransformConfiguration? | () | Column masking and transformation configuration. Inherited from cdc:Options. |
topicConfig | cdc:TopicConfiguration? | () | Topic naming and routing configuration. Inherited from cdc:Options. |
connectionRetryConfig | cdc:ConnectionRetryConfiguration? | () | Connection retry behavior. Inherited from cdc:Options. |
performanceConfig | cdc:PerformanceConfiguration? | () | Performance-tuning configuration. Inherited from cdc:Options. |
extendedSnapshot | ExtendedSnapshotConfiguration? | () | PostgreSQL extended snapshot configuration (lock timeout, isolation mode). Narrows the parent cdc:Options.extendedSnapshot. |
dataTypeConfig | cdc:DataTypeConfiguration? | () | Data-type handling configuration including schema-change tracking. |
ExtendedSnapshotConfiguration
PostgreSQL-specific extension of cdc:RelationalExtendedSnapshotConfiguration.
| Field | Type | Default | Description |
|---|---|---|---|
lockTimeout | decimal | 10 | Lock acquisition timeout in seconds during snapshot. |
isolationMode | cdc:SnapshotIsolationMode? | () | Transaction isolation level used during the snapshot. |
cdc:SecureDatabaseConnection
SSL/TLS configuration for the CDC database connection. Set on PostgresDatabaseConnection.secure.
| Field | Type | Default | Description |
|---|---|---|---|
sslMode | cdc:SslMode | PREFERRED | Connection security level (DISABLED, PREFERRED, REQUIRED, VERIFY_CA, VERIFY_IDENTITY). |
keyStore | crypto:KeyStore? | () | Client keystore for mutual TLS authentication. |
trustStore | crypto:TrustStore? | () | Truststore for verifying the server certificate. |
cdc:MessageKeyColumns
Defines a composite message key for a captured table.
| Field | Type | Default | Description |
|---|---|---|---|
tableName | string | Required | Fully-qualified table name (for example, "public.orders"). |
columns | string[] | Required | Column names that compose the message key. |