Skip to main content

Triggers

The CDC connector is built entirely around listeners and services. A database-specific CDC listener (e.g., mysql:CdcListener) connects to the database and streams change events; a cdc:Service defines the callbacks that process those events. The ballerinax/cdc package provides the shared types, configuration records, and service interface that all database-specific listeners implement.

Three components work together:

ComponentRole
cdc:ListenerAbstract listener interface implemented by database-specific listeners: mysql:CdcListener, postgresql:CdcListener, mssql:CdcListener, and oracledb:CdcListener.
cdc:ServiceBallerina service type that defines event callbacks (onRead, onCreate, onUpdate, onDelete, onTruncate, onError) invoked per change event.
@cdc:ServiceConfigOptional annotation applied to a cdc:Service to restrict which tables it receives events from.

Listener

The cdc:Listener establishes the connection and manages event subscriptions.

Configuration

The listener supports the following connection strategies:

Config TypeDescription
DatabaseConnectionCore database connectivity and table/column filtering configuration passed to the concrete listener constructor.
OptionsConnector behavior and performance tuning options passed via the options parameter of the listener constructor.
ListenerConfigurationTop-level configuration passed as the optional listenerConfig parameter to control engine identity, storage backends, and liveness.

DatabaseConnection fields:

FieldTypeDefaultDescription
hostnamestringRequiredHostname or IP address of the database server.
portintRequiredPort number of the database server.
usernamestringRequiredDatabase username with CDC privileges.
passwordstringRequiredPassword for the database user.
connectorClassstringRequiredFully-qualified Debezium connector class name (set automatically by database-specific packages).
includedTablesstring|string[]()Fully-qualified table name(s) to include in CDC events (e.g., "finance_db.transactions").
excludedTablesstring|string[]()Fully-qualified table name(s) to exclude from CDC events.
includedColumnsstring|string[]()Column name(s) to include in change event payloads.
excludedColumnsstring|string[]()Column name(s) to exclude from change event payloads.
connectTimeoutdecimal()Database connection timeout in seconds.
tasksMaxint1Maximum number of concurrent connector tasks.
secureSecureDatabaseConnection()SSL/TLS configuration for the database connection.

Options fields:

FieldTypeDefaultDescription
snapshotModeSnapshotModeINITIALControls snapshot behavior on connector startup: INITIAL, ALWAYS, NO_DATA, SCHEMA_ONLY, INITIAL_ONLY, RECOVERY, WHEN_NEEDED, CONFIGURATION_BASED, or CUSTOM.
eventProcessingFailureHandlingModeEventProcessingFailureHandlingModeWARNHow to handle event processing failures: FAIL (stop connector), WARN (log and continue), or SKIP (silently skip).
skippedOperationsOperation[][TRUNCATE]Operations to skip and not deliver to services. Values: CREATE, UPDATE, DELETE, TRUNCATE, NONE.
skipMessagesWithoutChangebooleanfalseWhen true, skip UPDATE events where the row data did not actually change.
decimalHandlingModeDecimalHandlingModeDOUBLEHow to represent DECIMAL/NUMERIC database columns: PRECISE (byte array), DOUBLE, or STRING.
maxQueueSizeint8192Maximum number of events that can be held in the connector's internal queue.
maxBatchSizeint2048Maximum number of events processed per batch.
queryTimeoutdecimal60Timeout in seconds for database queries issued by the connector.

ListenerConfiguration fields:

FieldTypeDefaultDescription
engineNamestring"ballerina-cdc-connector"Unique name for the Debezium engine instance. Must be unique when running multiple listeners.
internalSchemaStorageFileInternalSchemaStorage|KafkaInternalSchemaStorage{}Storage backend for Debezium schema history. Defaults to file-based storage at tmp/dbhistory.dat.
offsetStorageFileOffsetStorage|KafkaOffsetStorage{}Storage backend for Debezium connector offsets (tracks which events have been processed). Defaults to file-based storage at tmp/debezium-offsets.dat.
optionsOptions{}Connector behavior options (same as the options parameter on the listener constructor).
livenessIntervaldecimal60.0Interval in seconds between liveness heartbeat events used by cdc:isLive().

Initializing the listener

MySQL, with no initial snapshot, insert-only tracking:

import ballerinax/cdc;
import ballerinax/mysql;
import ballerinax/mysql.cdc.driver as _;

configurable string username = ?;
configurable string password = ?;

listener mysql:CdcListener financeDBListener = new (
database = {
username: username,
password: password,
includedDatabases: "finance_db",
includedTables: "finance_db.transactions"
},
options = {
snapshotMode: cdc:NO_DATA,
skippedOperations: [cdc:TRUNCATE, cdc:UPDATE, cdc:DELETE]
}
);

MySQL: multiple tables with Kafka-based offset storage:

import ballerinax/cdc;
import ballerinax/mysql;
import ballerinax/mysql.cdc.driver as _;

configurable string username = ?;
configurable string password = ?;

listener mysql:CdcListener mysqlListener = new (
database = {
username: username,
password: password,
includedDatabases: "store_db",
includedTables: ["store_db.products", "store_db.vendors", "store_db.product_reviews"]
},
options = {snapshotMode: cdc:NO_DATA},
listenerConfig = {
offsetStorage: {
bootstrapServers: "kafka-broker:9092",
topicName: "store_cdc_offsets"
},
internalSchemaStorage: {
bootstrapServers: "kafka-broker:9092",
topicName: "store_cdc_schema_history"
}
}
);

Service

A cdc:Service is attached to a CDC listener and implements remote function callbacks for each database change event type. Callbacks receive the row data as a record {} or a user-defined typed record, and may optionally accept a string tableName second parameter. Multiple services can be attached to the same listener, each scoped to specific tables using the @cdc:ServiceConfig annotation.

Callback signatures

FunctionSignatureDescription
onReadremote function onRead(record {} after) returns cdc:Error?Invoked for each existing row during the initial snapshot phase.
onRead (with table name)remote function onRead(record {} after, string tableName) returns cdc:Error?Variant of onRead that also receives the fully-qualified source table name.
onCreateremote function onCreate(record {} after) returns cdc:Error?Invoked when a new row is inserted into a tracked table.
onCreate (with table name)remote function onCreate(record {} after, string tableName) returns cdc:Error?Variant of onCreate that also receives the fully-qualified source table name.
onUpdateremote function onUpdate(record {} before, record {} after) returns cdc:Error?Invoked when a row is updated; receives both the pre-update and post-update row state.
onUpdate (with table name)remote function onUpdate(record {} before, record {} after, string tableName) returns cdc:Error?Variant of onUpdate that also receives the fully-qualified source table name.
onDeleteremote function onDelete(record {} before) returns cdc:Error?Invoked when a row is deleted from a tracked table; receives the pre-deletion row state.
onDelete (with table name)remote function onDelete(record {} before, string tableName) returns cdc:Error?Variant of onDelete that also receives the fully-qualified source table name.
onTruncateremote function onTruncate() returns cdc:Error?Invoked when a table is truncated. Supported on PostgreSQL only.
onTruncate (with table name)remote function onTruncate(string tableName) returns cdc:Error?Variant of onTruncate that also receives the name of the truncated table.
onErrorremote function onError(cdc:Error e) returns error?Invoked when an event processing error occurs; receives the cdc:Error (including PayloadBindingError for type-binding failures).
note

You do not need to implement all callbacks: only implement the event types relevant to your use case. Unimplemented callback types are silently ignored by the listener.

Full usage example

import ballerina/log;
import ballerinax/cdc;
import ballerinax/mysql;
import ballerinax/mysql.cdc.driver as _;

configurable string username = ?;
configurable string password = ?;

listener mysql:CdcListener mysqlListener = new (
database = {
username: username,
password: password,
includedDatabases: "store_db",
includedTables: ["store_db.products", "store_db.vendors", "store_db.product_reviews"]
},
options = {snapshotMode: cdc:NO_DATA}
);

// Service scoped to products and vendors tables
@cdc:ServiceConfig {
tables: ["store_db.products", "store_db.vendors"]
}
service cdc:Service on mysqlListener {
remote function onRead(Entity after, string tableName) returns error? {
log:printInfo("Snapshot row", table = tableName, data = after.toString());
}

remote function onCreate(Entity after, string tableName) returns error? {
log:printInfo("Row inserted", table = tableName, data = after.toString());
}

remote function onUpdate(Entity before, Entity after, string tableName) returns error? {
log:printInfo("Row updated", table = tableName,
before = before.toString(), after = after.toString());
}

remote function onDelete(Entity before, string tableName) returns error? {
log:printInfo("Row deleted", table = tableName, data = before.toString());
}

remote function onError(cdc:Error 'error) returns error? {
log:printError("CDC error", 'error);
}
}

// Separate service scoped to product_reviews table
@cdc:ServiceConfig {
tables: ["store_db.product_reviews"]
}
service cdc:Service on mysqlListener {
remote function onCreate(ProductReviews after, string tableName) returns error? {
log:printInfo("New review", table = tableName, data = after.toString());
}

remote function onError(cdc:Error 'error) returns error? {
log:printError("CDC error", 'error);
}
}

type Entity record {|
int id;
string name;
anydata...;
|};

type ProductReviews record {|
int review_id;
int product_id;
string content;
float rating;
|};
note

Replace the generic record {} parameter type with a concrete Ballerina record type (e.g., Transactions, Product) for automatic payload binding and compile-time type safety. If the incoming payload cannot be bound to the declared type, a cdc:PayloadBindingError is raised and routed to onError if that callback is implemented.


Supporting types

SecureDatabaseConnection

FieldTypeDescription
sslModeSslModeSSL enforcement level: DISABLED, PREFERRED, REQUIRED, VERIFY_CA, or VERIFY_IDENTITY. Default: PREFERRED.
keyStorecrypto:KeyStoreClient certificate key store for mutual TLS authentication.
trustStorecrypto:TrustStoreTrust store for verifying the database server's SSL certificate.

CdcServiceConfig

FieldTypeDescription
tablesstring|string[]One or more fully-qualified table names (e.g., "finance_db.transactions") that this service should receive events for. Events from other tables are not delivered to this service.

FileInternalSchemaStorage

FieldTypeDescription
classNamestringDebezium schema history class. Default: io.debezium.storage.file.history.FileSchemaHistory.
fileNamestringPath to the local schema history file. Default: tmp/dbhistory.dat.

KafkaInternalSchemaStorage

FieldTypeDescription
bootstrapServersstring|string[]Kafka bootstrap server address(es) for schema history storage.
topicNamestringKafka topic name for schema history. Default: bal_cdc_internal_schema_history.
securityProtocolkafka:SecurityProtocolKafka security protocol. Default: PROTOCOL_PLAINTEXT.
authkafka:AuthenticationConfigurationKafka authentication configuration for secured brokers.
secureSocketkafka:SecureSocketSSL/TLS configuration for the Kafka connection.

FileOffsetStorage

FieldTypeDescription
classNamestringDebezium offset storage class. Default: org.apache.kafka.connect.storage.FileOffsetBackingStore.
fileNamestringPath to the local offset storage file. Default: tmp/debezium-offsets.dat.

KafkaOffsetStorage

FieldTypeDescription
bootstrapServersstring|string[]Kafka bootstrap server address(es) for offset storage.
topicNamestringKafka topic name for offsets. Default: bal_cdc_offsets.
partitionsintNumber of partitions for the Kafka offset topic. Default: 1.
replicationFactorintReplication factor for the Kafka offset topic. Default: 2.
securityProtocolkafka:SecurityProtocolKafka security protocol. Default: PROTOCOL_PLAINTEXT.
authkafka:AuthenticationConfigurationKafka authentication configuration for secured brokers.
secureSocketkafka:SecureSocketSSL/TLS configuration for the Kafka connection.

EventProcessingError

FieldTypeDescription
payloadjsonThe raw JSON payload associated with the event that caused the processing error.

PayloadBindingError

FieldTypeDescription
payloadjsonThe raw JSON payload that could not be deserialized into the declared callback parameter type.