Skip to main content

Triggers

The ballerinax/mssql connector supports event-driven integration through Debezium-based Change Data Capture (CDC). When records are inserted, updated, deleted, or read during the initial snapshot in CDC-enabled tables, the mssql:CdcListener receives change events in real time and invokes your service callbacks automatically.

Three components work together:

ComponentRole
mssql:CdcListenerConnects to MSSQL via Debezium and streams change events from CDC-enabled tables.
cdc:ServiceDefines the onRead, onCreate, onUpdate, onDelete, and onError callbacks invoked per event.
MsSqlDatabaseConnectionConfiguration record for the MSSQL CDC database connection (host, port, credentials, schema and table filters).

For action-based record operations, see the Action Reference.


Listener

The mssql:CdcListener establishes the connection and manages event subscriptions.

Configuration

The listener supports the following connection strategies:

Config TypeDescription
MsSqlDatabaseConnectionConfigures the CDC database connection including server address, credentials, schema and table filtering, and column filtering.
MsSqlListenerConfigurationTop-level listener configuration wrapping the database connection and CDC options.

MsSqlDatabaseConnection fields:

FieldTypeDefaultDescription
connectorClassstring"io.debezium.connector.sqlserver.SqlServerConnector"The Debezium SQL Server connector class name.
hostnamestring"localhost"Hostname of the MSSQL server.
portint1433Port number of the MSSQL server.
usernamestringRequiredDatabase username for the CDC connection. Inherited from cdc:DatabaseConnection.
passwordstringRequiredDatabase password for the CDC connection. Inherited from cdc:DatabaseConnection.
databaseNamesstring|string[]RequiredName(s) of the database(s) to capture changes from.
connectTimeoutdecimal?()Connection timeout in seconds. Inherited from cdc:DatabaseConnection.
securecdc:SecureDatabaseConnection?()SSL/TLS configuration for the database connection. Inherited from cdc:DatabaseConnection.
databaseInstancestring?()Named SQL Server instance, if applicable.
includedSchemasstring|string[]?()Schema(s) to include in CDC capture (for example, "dbo"). Mutually exclusive with excludedSchemas.
excludedSchemasstring|string[]?()Schema(s) to exclude from CDC capture. Mutually exclusive with includedSchemas.
includedTablesstring|string[]?()Table identifiers in schema.table format, or regex patterns to capture (for example, "dbo.Employees"). Mutually exclusive with excludedTables.
excludedTablesstring|string[]?()Regex patterns of tables to exclude from capture. Mutually exclusive with includedTables.
includedColumnsstring|string[]?()Regex patterns of columns to capture. Mutually exclusive with excludedColumns.
excludedColumnsstring|string[]?()Regex patterns of columns to exclude from capture. Mutually exclusive with includedColumns.
messageKeyColumnscdc:MessageKeyColumns?()Composite message-key column mappings for change events.
tasksMaxint1Maximum number of CDC tasks.
streamingConfigStreamingConfiguration?()Streaming and status-update configuration for CDC change events.

MsSqlListenerConfiguration fields:

FieldTypeDefaultDescription
databaseMsSqlDatabaseConnectionRequiredThe MSSQL CDC database connection configuration.
optionsMssqlOptions{}MSSQL-specific CDC options such as snapshotMode, skippedOperations, snapshot fetch size, and data type handling.
engineNamestring"ballerina-cdc-connector"Debezium engine instance name. Inherited from cdc:ListenerConfiguration.
internalSchemaStoragecdc:InternalSchemaStorage{fileName: "tmp/dbhistory.dat"}Schema-history storage configuration (file, Kafka, JDBC, Redis, S3, Azure Blob, RocketMQ, or in-memory). Inherited from cdc:ListenerConfiguration.
offsetStoragecdc:OffsetStorage{fileName: "tmp/debezium-offsets.dat"}Offset storage configuration (file, Kafka, JDBC, Redis, or in-memory). Inherited from cdc:ListenerConfiguration.
livenessIntervaldecimal60.0Interval, in seconds, for checking CDC listener liveness. Inherited from cdc:ListenerConfiguration.

Initializing the listener

Basic CDC listener for a single database:

import ballerinax/mssql;
import ballerinax/mssql.cdc.driver as _;

configurable string host = ?;
configurable int port = ?;
configurable string user = ?;
configurable string password = ?;
configurable string database = ?;

listener mssql:CdcListener cdcListener = new (database = {
hostname: host,
port: port,
username: user,
password: password,
databaseNames: database,
includedTables: ["dbo.Transactions"]
});

CDC listener with schema and column filters:

import ballerinax/cdc;
import ballerinax/mssql;
import ballerinax/mssql.cdc.driver as _;

configurable string user = ?;
configurable string password = ?;
configurable string database = ?;

listener mssql:CdcListener cdcListener = new (
database = {
hostname: "db.example.com",
port: 1433,
username: user,
password: password,
databaseNames: database,
includedSchemas: "dbo",
includedTables: ["dbo.Transactions", "dbo.Orders"]
},
options = {
snapshotMode: cdc:NO_DATA,
skippedOperations: [cdc:TRUNCATE]
}
);

Service

A cdc:Service is a Ballerina service attached to a mssql:CdcListener. It listens for change events on the configured CDC-enabled tables and implements callbacks for each event type. You can type the callback parameters with your own Ballerina record types for automatic mapping.

Callback signatures

FunctionSignatureDescription
onReadremote function onRead(record {} after) returns cdc:Error?Invoked for each existing record during the initial CDC snapshot.
onCreateremote function onCreate(record {} after) returns cdc:Error?Invoked when a new record is inserted.
onUpdateremote function onUpdate(record {} before, record {} after) returns cdc:Error?Invoked when an existing record is updated. Receives both the before and after state.
onDeleteremote function onDelete(record {} before) returns cdc:Error?Invoked when a record is deleted. Receives the record state before deletion.
onErrorremote function onError(cdc:Error err) returns cdc:Error?Invoked when the listener encounters an error during change-event delivery.

Each row-level callback (onRead, onCreate, onUpdate, onDelete) accepts an optional trailing string tableName parameter to receive the qualified table identifier (schema.table) the event originated from. For example: remote function onCreate(record {} after, string tableName) returns cdc:Error?.

note

You do not need to implement all callbacks. Only implement the event types relevant to your use case.

Full usage example

import ballerina/log;
import ballerinax/cdc;
import ballerinax/mssql;
import ballerinax/mssql.cdc.driver as _;

configurable string host = ?;
configurable int port = ?;
configurable string user = ?;
configurable string password = ?;
configurable string database = ?;

type Transaction record {|
int id;
string customerId;
decimal amount;
string status;
|};

listener mssql:CdcListener cdcListener = new (
database = {
hostname: host,
port: port,
username: user,
password: password,
databaseNames: database,
includedTables: ["dbo.Transactions"]
},
options = {
snapshotMode: cdc:NO_DATA
}
);

service cdc:Service on cdcListener {
isolated remote function onCreate(Transaction after) returns cdc:Error? {
log:printInfo("New transaction created", data = after.toString());
}

isolated remote function onUpdate(Transaction before, Transaction after) returns cdc:Error? {
log:printInfo("Transaction updated",
before = before.toString(),
after = after.toString()
);
}

isolated remote function onDelete(Transaction before) returns cdc:Error? {
log:printInfo("Transaction deleted", data = before.toString());
}

isolated remote function onError(cdc:Error err) returns cdc:Error? {
log:printError("CDC error", 'error = err);
}
}
note

CDC must be enabled on the SQL Server database and the specific tables you want to monitor. Use sys.sp_cdc_enable_db and sys.sp_cdc_enable_table to enable CDC at the database and table level. See the Setup Guide for full instructions.


Supporting types

MsSqlDatabaseConnection

FieldTypeDescription
connectorClassstringDebezium SQL Server connector class (default: "io.debezium.connector.sqlserver.SqlServerConnector").
hostnamestringHostname of the MSSQL server (default: "localhost").
portintPort number of the MSSQL server (default: 1433).
usernamestringDatabase username for the CDC connection. Required.
passwordstringDatabase password for the CDC connection. Required.
databaseNamesstring|string[]Name(s) of the database(s) to capture changes from. Required.
connectTimeoutdecimal?Connection timeout in seconds.
securecdc:SecureDatabaseConnection?SSL/TLS configuration for the database connection.
databaseInstancestring?Named SQL Server instance, if applicable.
includedSchemasstring|string[]?Schema(s) to include in CDC capture.
excludedSchemasstring|string[]?Schema(s) to exclude from CDC capture.
includedTablesstring|string[]?Tables to include in CDC capture.
excludedTablesstring|string[]?Tables to exclude from CDC capture.
includedColumnsstring|string[]?Columns to include in CDC capture.
excludedColumnsstring|string[]?Columns to exclude from CDC capture.
messageKeyColumnscdc:MessageKeyColumns?Composite message-key column mappings for change events.
tasksMaxintMaximum number of CDC tasks (default: 1).
streamingConfigStreamingConfiguration?Streaming and status-update configuration for CDC change events.