Skip to main content

Triggers

The ballerinax/mysql connector supports event-driven integration through Change Data Capture (CDC) powered by Debezium. When rows are inserted, updated, deleted, or read during the initial snapshot in monitored MySQL tables, the mysql:CdcListener receives change events in real time and invokes your service callbacks automatically.

Three components work together:

ComponentRole
mysql:CdcListenerConnects to MySQL binary log via Debezium and streams row-level change events to attached services.
cdc:ServiceDefines the onRead, onCreate, onUpdate, and onDelete callbacks invoked per change event.
MySqlDatabaseConnectionConfiguration record for the MySQL CDC database connection (host, port, credentials, database/table filters).

For action-based record operations, see the Action Reference.


Listener

The mysql:CdcListener establishes the connection and manages event subscriptions.

Configuration

The listener supports the following connection strategies:

Config TypeDescription
MySqlDatabaseConnectionConfigures the CDC database connection including server address, credentials, and table filtering.
MySqlListenerConfigurationTop-level listener configuration wrapping the database connection and CDC options.

MySqlDatabaseConnection fields:

FieldTypeDefaultDescription
connectorClassstring"io.debezium.connector.mysql.MySqlConnector"The Debezium MySQL connector class name.
hostnamestring"localhost"The hostname of the MySQL server.
portint3306The port number of the MySQL server.
usernamestringRequiredMySQL username with replication privileges.
passwordstringRequiredMySQL password for the specified user.
connectTimeoutdecimal?()Connection timeout in seconds. Inherited from cdc:DatabaseConnection.
securecdc:SecureDatabaseConnection?()SSL/TLS configuration for the database connection. Inherited from cdc:DatabaseConnection.
databaseServerIdstringauto-generated random integer (as a string)Unique identifier for this MySQL server instance in the replication topology.
includedDatabasesstring|string[]?()Regex patterns of databases to capture changes from. Do not use alongside excludedDatabases.
excludedDatabasesstring|string[]?()Regex patterns of databases to exclude from capture. Do not use alongside includedDatabases.
includedTablesstring|string[]?()Fully-qualified table names or regex patterns to capture (for example, "mydb.orders"). Mutually exclusive with excludedTables.
excludedTablesstring|string[]?()Regex patterns of tables to exclude. Mutually exclusive with includedTables.
includedColumnsstring|string[]?()Regex patterns of columns to capture. Mutually exclusive with excludedColumns.
excludedColumnsstring|string[]?()Regex patterns of columns to exclude. Mutually exclusive with includedColumns.
messageKeyColumnscdc:MessageKeyColumns?()Composite message key columns for change events.
tasksMaxint1Maximum number of tasks for the connector. The MySQL connector always uses a single task, so changing this has no effect.
replicationConfigReplicationConfiguration?()MySQL GTID-based replication configuration (gtidSourceIncludes/gtidSourceExcludes).
binlogConfigBinlogConfiguration?()MySQL binlog configuration (for example, bufferSize).

MySqlListenerConfiguration fields:

FieldTypeDefaultDescription
databaseMySqlDatabaseConnectionRequiredThe MySQL CDC database connection configuration.
optionsMySqlOptions{}Advanced CDC options including snapshotMode, skippedOperations, MySQL-specific extended snapshot, data type handling, and heartbeat configs.
engineNamestring"ballerina-cdc-connector"Debezium engine instance name. Inherited from cdc:ListenerConfiguration.
internalSchemaStoragecdc:InternalSchemaStorage{fileName: "tmp/dbhistory.dat"}Schema-history storage configuration (file, Kafka, JDBC, Redis, S3, Azure Blob, RocketMQ, or in-memory). Inherited from cdc:ListenerConfiguration.
offsetStoragecdc:OffsetStorage{fileName: "tmp/debezium-offsets.dat"}Offset storage configuration (file, Kafka, JDBC, Redis, or in-memory). Inherited from cdc:ListenerConfiguration.
livenessIntervaldecimal60.0Interval in seconds for checking CDC listener liveness. Inherited from cdc:ListenerConfiguration.

Initializing the listener

Basic CDC listener with default settings:

import ballerinax/mysql;
import ballerinax/mysql.cdc.driver as _;

configurable string username = ?;
configurable string password = ?;

listener mysql:CdcListener cdcListener = new (database = {
username: username,
password: password
});

CDC listener with database and table filters:

import ballerinax/cdc;
import ballerinax/mysql;
import ballerinax/mysql.cdc.driver as _;

configurable string username = ?;
configurable string password = ?;

listener mysql:CdcListener cdcListener = new (
database = {
username: username,
password: password,
includedDatabases: "mydb",
includedTables: "mydb.orders"
},
options = {
snapshotMode: cdc:NO_DATA,
skippedOperations: [cdc:TRUNCATE]
}
);

Service

A cdc:Service is a Ballerina service attached to a mysql:CdcListener. It listens for row-level change events on monitored MySQL tables and implements callbacks for each event type. You can type the callback parameters with your own Ballerina record types for automatic mapping.

Callback signatures

FunctionSignatureDescription
onReadremote function onRead(record {} after) returns cdc:Error?Invoked during the initial snapshot for each existing row read from the database.
onCreateremote function onCreate(record {} after) returns cdc:Error?Invoked when a new row is inserted into a monitored table.
onUpdateremote function onUpdate(record {} before, record {} after) returns cdc:Error?Invoked when a row is updated, providing both the before and after state.
onDeleteremote function onDelete(record {} before) returns cdc:Error?Invoked when a row is deleted, providing the row state before deletion.
onErrorremote function onError(cdc:Error err) returns cdc:Error?Invoked when the listener encounters an error during change-event delivery (for example, deserialization failures or connector errors).
note

You do not need to implement all of these callbacks. Only implement the event types relevant to your use case.

Full usage example

import ballerina/log;
import ballerinax/cdc;
import ballerinax/mysql;
import ballerinax/mysql.cdc.driver as _;

configurable string username = ?;
configurable string password = ?;

type Order record {|
int order_id;
int customer_id;
decimal amount;
string status;
|};

listener mysql:CdcListener cdcListener = new (
database = {
username: username,
password: password,
includedDatabases: "shop",
includedTables: "shop.orders"
},
options = {
snapshotMode: cdc:NO_DATA
}
);

service cdc:Service on cdcListener {
isolated remote function onRead(Order after) returns cdc:Error? {
log:printInfo("Snapshot row", order = after.toString());
}

isolated remote function onCreate(Order after) returns cdc:Error? {
log:printInfo("New order created",
orderId = after.order_id,
amount = after.amount
);
}

isolated remote function onUpdate(Order before, Order after) returns cdc:Error? {
log:printInfo("Order updated",
orderId = after.order_id,
oldStatus = before.status,
newStatus = after.status
);
}

isolated remote function onDelete(Order before) returns cdc:Error? {
log:printInfo("Order deleted", orderId = before.order_id);
}
}
note

For CDC to work, MySQL must have binary logging enabled with binlog-format=ROW. Additionally, import the ballerinax/mysql.cdc.driver as _ module to bundle the required Debezium drivers.


Supporting types

For the MySqlDatabaseConnection field reference, see the Listener > Configuration section above.

MySqlOptions

CDC options for the listener. Extends cdc:Options with MySQL-specific fields.

FieldTypeDefaultDescription
snapshotModecdc:SnapshotModeINITIALInitial snapshot behavior (INITIAL, ALWAYS, NO_DATA, WHEN_NEEDED, SCHEMA_ONLY, RECOVERY, and so on).
eventProcessingFailureHandlingModecdc:EventProcessingFailureHandlingModeWARNHow to handle event-processing failures (FAIL, WARN, SKIP).
skippedOperationscdc:Operation[TRUNCATE]Operations to skip publishing.
skipMessagesWithoutChangebooleanfalseDiscard events that contain no row-data changes.
decimalHandlingModecdc:DecimalHandlingModeDOUBLERepresentation mode for decimal values.
maxQueueSizeint8192Maximum number of events in the internal queue.
maxBatchSizeint2048Maximum number of events per processing batch.
queryTimeoutdecimal60.0Database query timeout in seconds.
heartbeatConfigcdc:RelationalHeartbeatConfiguration?()Heartbeat for keeping the MySQL replication connection alive.
signalConfigcdc:SignalConfiguration?()Signal-channel configuration for ad-hoc control.
transactionMetadataConfigcdc:TransactionMetadataConfiguration?()Transaction-boundary event configuration.
columnTransformConfigcdc:ColumnTransformConfiguration?()Column masking/transformation configuration.
topicConfigcdc:TopicConfiguration?()Topic naming and routing configuration.
connectionRetryConfigcdc:ConnectionRetryConfiguration?()Error handling and retry configuration.
performanceConfigcdc:PerformanceConfiguration?()Performance-tuning configuration.
extendedSnapshotExtendedSnapshotConfiguration?()MySQL extended snapshot configuration (for example, lockTimeout). Narrows the parent cdc:Options.extendedSnapshot.
dataTypeConfigDataTypeConfiguration?()MySQL-specific type handling: bigIntUnsignedHandlingMode, enableTimeAdjuster, includeSchemaChanges.

cdc:SecureDatabaseConnection

SSL/TLS configuration for the CDC database connection. Set on MySqlDatabaseConnection.secure.

FieldTypeDefaultDescription
sslModecdc:SslModePREFERREDConnection security level (DISABLED, PREFERRED, REQUIRED, VERIFY_CA, VERIFY_IDENTITY).
keyStorecrypto:KeyStore?()Client keystore for mutual TLS authentication.
trustStorecrypto:TrustStore?()Truststore for verifying the server certificate.

ReplicationConfiguration

FieldTypeDefaultDescription
gtidSourceIncludesstring|string[]?()Comma-separated list of GTID source UUIDs to include.
gtidSourceExcludesstring|string[]?()Comma-separated list of GTID source UUIDs to exclude.

BinlogConfiguration

FieldTypeDefaultDescription
bufferSizeint8192Size of the binlog buffer in bytes.