Triggers
The ballerinax/mssql connector supports event-driven integration through Debezium-based Change Data Capture (CDC). When records are inserted, updated, deleted, or read during the initial snapshot in CDC-enabled tables, the mssql:CdcListener receives change events in real time and invokes your service callbacks automatically.
Three components work together:
| Component | Role |
|---|---|
mssql:CdcListener | Connects to MSSQL via Debezium and streams change events from CDC-enabled tables. |
cdc:Service | Defines the onRead, onCreate, onUpdate, onDelete, and onError callbacks invoked per event. |
MsSqlDatabaseConnection | Configuration record for the MSSQL CDC database connection (host, port, credentials, schema and table filters). |
For action-based record operations, see the Action Reference.
Listener
The mssql:CdcListener establishes the connection and manages event subscriptions.
Configuration
The listener supports the following connection strategies:
| Config Type | Description |
|---|---|
MsSqlDatabaseConnection | Configures the CDC database connection including server address, credentials, schema and table filtering, and column filtering. |
MsSqlListenerConfiguration | Top-level listener configuration wrapping the database connection and CDC options. |
MsSqlDatabaseConnection fields:
| Field | Type | Default | Description |
|---|---|---|---|
connectorClass | string | "io.debezium.connector.sqlserver.SqlServerConnector" | The Debezium SQL Server connector class name. |
hostname | string | "localhost" | Hostname of the MSSQL server. |
port | int | 1433 | Port number of the MSSQL server. |
username | string | Required | Database username for the CDC connection. Inherited from cdc:DatabaseConnection. |
password | string | Required | Database password for the CDC connection. Inherited from cdc:DatabaseConnection. |
databaseNames | string|string[] | Required | Name(s) of the database(s) to capture changes from. |
connectTimeout | decimal? | () | Connection timeout in seconds. Inherited from cdc:DatabaseConnection. |
secure | cdc:SecureDatabaseConnection? | () | SSL/TLS configuration for the database connection. Inherited from cdc:DatabaseConnection. |
databaseInstance | string? | () | Named SQL Server instance, if applicable. |
includedSchemas | string|string[]? | () | Schema(s) to include in CDC capture (for example, "dbo"). Mutually exclusive with excludedSchemas. |
excludedSchemas | string|string[]? | () | Schema(s) to exclude from CDC capture. Mutually exclusive with includedSchemas. |
includedTables | string|string[]? | () | Table identifiers in schema.table format, or regex patterns to capture (for example, "dbo.Employees"). Mutually exclusive with excludedTables. |
excludedTables | string|string[]? | () | Regex patterns of tables to exclude from capture. Mutually exclusive with includedTables. |
includedColumns | string|string[]? | () | Regex patterns of columns to capture. Mutually exclusive with excludedColumns. |
excludedColumns | string|string[]? | () | Regex patterns of columns to exclude from capture. Mutually exclusive with includedColumns. |
messageKeyColumns | cdc:MessageKeyColumns? | () | Composite message-key column mappings for change events. |
tasksMax | int | 1 | Maximum number of CDC tasks. |
streamingConfig | StreamingConfiguration? | () | Streaming and status-update configuration for CDC change events. |
MsSqlListenerConfiguration fields:
| Field | Type | Default | Description |
|---|---|---|---|
database | MsSqlDatabaseConnection | Required | The MSSQL CDC database connection configuration. |
options | MssqlOptions | {} | MSSQL-specific CDC options such as snapshotMode, skippedOperations, snapshot fetch size, and data type handling. |
engineName | string | "ballerina-cdc-connector" | Debezium engine instance name. Inherited from cdc:ListenerConfiguration. |
internalSchemaStorage | cdc:InternalSchemaStorage | {fileName: "tmp/dbhistory.dat"} | Schema-history storage configuration (file, Kafka, JDBC, Redis, S3, Azure Blob, RocketMQ, or in-memory). Inherited from cdc:ListenerConfiguration. |
offsetStorage | cdc:OffsetStorage | {fileName: "tmp/debezium-offsets.dat"} | Offset storage configuration (file, Kafka, JDBC, Redis, or in-memory). Inherited from cdc:ListenerConfiguration. |
livenessInterval | decimal | 60.0 | Interval, in seconds, for checking CDC listener liveness. Inherited from cdc:ListenerConfiguration. |
Initializing the listener
Basic CDC listener for a single database:
import ballerinax/mssql;
import ballerinax/mssql.cdc.driver as _;
configurable string host = ?;
configurable int port = ?;
configurable string user = ?;
configurable string password = ?;
configurable string database = ?;
listener mssql:CdcListener cdcListener = new (database = {
hostname: host,
port: port,
username: user,
password: password,
databaseNames: database,
includedTables: ["dbo.Transactions"]
});
CDC listener with schema and column filters:
import ballerinax/cdc;
import ballerinax/mssql;
import ballerinax/mssql.cdc.driver as _;
configurable string user = ?;
configurable string password = ?;
configurable string database = ?;
listener mssql:CdcListener cdcListener = new (
database = {
hostname: "db.example.com",
port: 1433,
username: user,
password: password,
databaseNames: database,
includedSchemas: "dbo",
includedTables: ["dbo.Transactions", "dbo.Orders"]
},
options = {
snapshotMode: cdc:NO_DATA,
skippedOperations: [cdc:TRUNCATE]
}
);
Service
A cdc:Service is a Ballerina service attached to a mssql:CdcListener. It listens for change events on the configured CDC-enabled tables and implements callbacks for each event type. You can type the callback parameters with your own Ballerina record types for automatic mapping.
Callback signatures
| Function | Signature | Description |
|---|---|---|
onRead | remote function onRead(record {} after) returns cdc:Error? | Invoked for each existing record during the initial CDC snapshot. |
onCreate | remote function onCreate(record {} after) returns cdc:Error? | Invoked when a new record is inserted. |
onUpdate | remote function onUpdate(record {} before, record {} after) returns cdc:Error? | Invoked when an existing record is updated. Receives both the before and after state. |
onDelete | remote function onDelete(record {} before) returns cdc:Error? | Invoked when a record is deleted. Receives the record state before deletion. |
onError | remote function onError(cdc:Error err) returns cdc:Error? | Invoked when the listener encounters an error during change-event delivery. |
Each row-level callback (onRead, onCreate, onUpdate, onDelete) accepts an optional trailing string tableName parameter to receive the qualified table identifier (schema.table) the event originated from. For example: remote function onCreate(record {} after, string tableName) returns cdc:Error?.
You do not need to implement all callbacks. Only implement the event types relevant to your use case.
Full usage example
import ballerina/log;
import ballerinax/cdc;
import ballerinax/mssql;
import ballerinax/mssql.cdc.driver as _;
configurable string host = ?;
configurable int port = ?;
configurable string user = ?;
configurable string password = ?;
configurable string database = ?;
type Transaction record {|
int id;
string customerId;
decimal amount;
string status;
|};
listener mssql:CdcListener cdcListener = new (
database = {
hostname: host,
port: port,
username: user,
password: password,
databaseNames: database,
includedTables: ["dbo.Transactions"]
},
options = {
snapshotMode: cdc:NO_DATA
}
);
service cdc:Service on cdcListener {
isolated remote function onCreate(Transaction after) returns cdc:Error? {
log:printInfo("New transaction created", data = after.toString());
}
isolated remote function onUpdate(Transaction before, Transaction after) returns cdc:Error? {
log:printInfo("Transaction updated",
before = before.toString(),
after = after.toString()
);
}
isolated remote function onDelete(Transaction before) returns cdc:Error? {
log:printInfo("Transaction deleted", data = before.toString());
}
isolated remote function onError(cdc:Error err) returns cdc:Error? {
log:printError("CDC error", 'error = err);
}
}
CDC must be enabled on the SQL Server database and the specific tables you want to monitor. Use sys.sp_cdc_enable_db and sys.sp_cdc_enable_table to enable CDC at the database and table level. See the Setup Guide for full instructions.
Supporting types
MsSqlDatabaseConnection
| Field | Type | Description |
|---|---|---|
connectorClass | string | Debezium SQL Server connector class (default: "io.debezium.connector.sqlserver.SqlServerConnector"). |
hostname | string | Hostname of the MSSQL server (default: "localhost"). |
port | int | Port number of the MSSQL server (default: 1433). |
username | string | Database username for the CDC connection. Required. |
password | string | Database password for the CDC connection. Required. |
databaseNames | string|string[] | Name(s) of the database(s) to capture changes from. Required. |
connectTimeout | decimal? | Connection timeout in seconds. |
secure | cdc:SecureDatabaseConnection? | SSL/TLS configuration for the database connection. |
databaseInstance | string? | Named SQL Server instance, if applicable. |
includedSchemas | string|string[]? | Schema(s) to include in CDC capture. |
excludedSchemas | string|string[]? | Schema(s) to exclude from CDC capture. |
includedTables | string|string[]? | Tables to include in CDC capture. |
excludedTables | string|string[]? | Tables to exclude from CDC capture. |
includedColumns | string|string[]? | Columns to include in CDC capture. |
excludedColumns | string|string[]? | Columns to exclude from CDC capture. |
messageKeyColumns | cdc:MessageKeyColumns? | Composite message-key column mappings for change events. |
tasksMax | int | Maximum number of CDC tasks (default: 1). |
streamingConfig | StreamingConfiguration? | Streaming and status-update configuration for CDC change events. |