Skip to main content

Triggers

The ballerinax/asb connector supports event-driven integration through the asb:Listener. When messages arrive on Azure Service Bus queues or topic subscriptions, the listener receives them and dispatches to your service callbacks automatically. You can settle messages using the asb:Caller provided to the onMessage callback.

Three components work together:

ComponentRole
asb:ListenerConnects to Azure Service Bus and continuously receives messages from a queue or subscription, dispatching them to service callbacks.
asb:ServiceDefines the onMessage and onError callbacks invoked when messages are received or retrieval errors occur.
asb:CallerProvided in the onMessage callback for settling messages (complete, abandon, dead-letter, defer).
asb:MessageThe message payload passed to the onMessage callback.
asb:MessageRetrievalErrorThe error record passed to the onError callback when message retrieval fails.

For action-based record operations, see the Action Reference.


Listener

The asb:Listener establishes the connection and manages event subscriptions.

Configuration

The listener supports the following connection strategies:

Config TypeDescription
ListenerConfigurationConfiguration for the ASB Listener. Extends ASBServiceReceiverConfig with additional listener-specific settings.

ListenerConfiguration fields:

FieldTypeDefaultDescription
connectionStringstringRequiredThe Azure Service Bus connection string.
entityConfigQueueConfig|TopicSubsConfigRequiredEntity configuration: either {queueName: "..."} for queues or {topicName: "...", subscriptionName: "..."} for topic subscriptions.
receiveModePEEK_LOCK|RECEIVE_AND_DELETEPEEK_LOCKThe receive mode. PEEK_LOCK requires explicit settlement; RECEIVE_AND_DELETE auto-removes on receive.
maxAutoLockRenewDurationint300Maximum duration (in seconds) to automatically renew the message lock.
amqpRetryOptionsAmqpRetryOptions()Retry options for AMQP operations.
autoCompletebooleantrueWhether to automatically complete messages after the onMessage callback returns successfully. Set to false to use manual settlement via the Caller.
prefetchCountint0Number of messages to prefetch from the broker. 0 disables prefetching.
maxConcurrencyint1Maximum number of concurrent message processing threads.

Initializing the listener

Listening on a queue:

import ballerinax/asb;

configurable string connectionString = ?;

listener asb:Listener asbListener = new ({
connectionString: connectionString,
entityConfig: {
queueName: "my-queue"
},
receiveMode: asb:PEEK_LOCK,
autoComplete: false
});

Listening on a topic subscription:

import ballerinax/asb;

configurable string connectionString = ?;

listener asb:Listener asbListener = new ({
connectionString: connectionString,
entityConfig: {
topicName: "my-topic",
subscriptionName: "my-subscription"
},
receiveMode: asb:PEEK_LOCK,
autoComplete: true,
maxConcurrency: 5
});

Service

An asb:Service is a Ballerina service attached to an asb:Listener. It implements callbacks for processing messages and handling retrieval errors from Azure Service Bus.

Callback signatures

FunctionSignatureDescription
onMessageremote function onMessage(asb:Message message) returns error? or remote function onMessage(asb:Message message, asb:Caller caller) returns error?Invoked when a message is received. Include asb:Caller only when using autoComplete: false for manual settlement. When autoComplete: true (default), omit caller: messages are completed automatically.
onErrorremote function onError(asb:MessageRetrievalError 'error) returns error?Invoked when an error occurs during message retrieval. Optional: if not implemented, retrieval errors are logged but not surfaced to your code.
info

When autoComplete is true (the default), messages are automatically completed after onMessage returns successfully. Set it to false for manual settlement.

Full usage example

import ballerina/log;
import ballerinax/asb;

configurable string connectionString = ?;

listener asb:Listener asbListener = new ({
connectionString: connectionString,
entityConfig: {
queueName: "my-queue"
},
receiveMode: asb:PEEK_LOCK,
autoComplete: false
});

service asb:Service on asbListener {
remote function onMessage(asb:Message message, asb:Caller caller) returns error? {
// Process the message
log:printInfo("Received message",
messageId = message.messageId,
body = message.body.toString()
);

// Settle the message manually
check caller->complete();
}

remote function onError(asb:MessageRetrievalError 'error) returns error? {
log:printError("Error receiving message", 'error);
}
}
info

The onError callback is optional. If not implemented, retrieval errors are logged but not handled by your code.


Supporting types

Message

FieldTypeDescription
bodyanydataThe message body content.
contentTypestring?The MIME content type of the message body. Use the connector constants asb:TEXT ("text/plain"), asb:JSON ("application/json"), asb:XML ("application/xml"), or asb:BYTE_ARRAY ("application/octet-stream").
messageIdstring?A unique identifier for the message.
tostring?The destination address of the message.
replyTostring?The address to reply to.
replyToSessionIdstring?The session ID to reply to.
labelstring?An application-specific label for the message.
sessionIdstring?The session identifier for session-aware entities.
correlationIdstring?A correlation identifier for request-reply patterns.
partitionKeystring?The partition key for partitioned entities.
timeToLiveint?The message time-to-live in seconds.
sequenceNumberint?The unique sequence number assigned by Service Bus.
lockTokenstring?The lock token for the message (used in PEEK_LOCK mode).
applicationPropertiesApplicationProperties?Custom application properties attached to the message. Access values via message.applicationProperties?.properties.
deliveryCountint?The number of times delivery has been attempted.
enqueuedTimestring?The UTC time when the message was enqueued.
enqueuedSequenceNumberint?The enqueued sequence number.
deadLetterSourcestring?The name of the entity where the message was dead-lettered.
statestring?The state of the message (Active, Deferred, Scheduled).

Caller

MethodParametersReturnsDescription
complete-error?Completes the message, removing it from the queue.
abandon*propertiesToModify: optional rest-record of anydata key-value pairs to set on the message before releasing the lockerror?Abandons the message, releasing its lock for redelivery.
deadLetter*DeadLetterOptions: optional deadLetterReason: string, deadLetterErrorDescription: string, propertiesToModify: map<anydata>error?Moves the message to the dead-letter sub-queue.
defer*propertiesToModify: optional rest-record of anydata key-value pairserror?Defers the message. Unlike MessageReceiver.defer(), this does not return the sequence number: read message.sequenceNumber before calling if you need it. Retrieve the deferred message via receiver->receiveDeferred(sequenceNumber).

MessageRetrievalError

asb:MessageRetrievalError is a distinct Error carrying an ErrorContext detail record. Access the detail via err.detail().

FieldTypeDescription
entityPathstringThe entity path (queue or subscription) where the error occurred.
classNamestringThe name of the class that originated the error.
namespacestringThe Service Bus namespace associated with the error.
errorSourcestringThe function or action that was the source of the error.
reasonstringA description of the reason for the error.
remote function onError(asb:MessageRetrievalError err) returns error? {
asb:ErrorContext detail = err.detail();
log:printError("Message retrieval failed",
entityPath = detail.entityPath,
reason = detail.reason
);
}

What's next

  • Action Reference; use MessageReceiver for polling-based message consumption
  • Setup Guide: obtain the connection string required for the listener
  • Example: complete worked example for event-driven trigger setup