Skip to main content

Triggers

The ballerinax/rabbitmq connector supports event-driven message consumption through a rabbitmq:Listener and rabbitmq:Service. When messages arrive on a subscribed queue, the listener dispatches them to your service callbacks automatically, supporting both one-way message processing and request-reply patterns.

Three components work together:

ComponentRole
rabbitmq:ListenerConnects to the RabbitMQ broker, subscribes to queues, and dispatches incoming messages to attached services.
rabbitmq:ServiceDefines onMessage and/or onRequest callbacks invoked when messages arrive on the configured queue.
rabbitmq:CallerProvided to callbacks for manual message acknowledgement or rejection when autoAck is false.
rabbitmq:AnydataMessageThe message payload passed to each callback, containing content, routing key, exchange, delivery tag, and properties.

For action-based record operations, see the Action Reference.


Listener

The rabbitmq:Listener establishes the connection and manages event subscriptions.

Configuration

The listener supports the following connection strategies:

Config TypeDescription
ConnectionConfigurationConnection settings for the RabbitMQ listener, including authentication, virtual host, timeouts, and TLS.

ConnectionConfiguration fields:

FieldTypeDefaultDescription
hoststringRequiredRabbitMQ server hostname (constructor parameter).
portintRequiredRabbitMQ server AMQP port (constructor parameter).
qosSettingsQosSettings?()Quality of Service prefetch settings (constructor parameter).
authCredentials()Username and password credentials.
virtualHoststring()The virtual host to connect to.
connectionTimeoutdecimal()Connection timeout in seconds.
handshakeTimeoutdecimal()TLS handshake timeout in seconds.
shutdownTimeoutdecimal()Shutdown timeout in seconds.
heartbeatdecimal()Heartbeat interval in seconds.
secureSocketSecureSocket()TLS/SSL configuration for secure connections.

Initializing the listener

Basic listener with default settings:

import ballerinax/rabbitmq;

listener rabbitmq:Listener channelListener = new ("localhost", 5672);

Listener with credentials and QoS prefetch:

import ballerinax/rabbitmq;

configurable string host = ?;
configurable int port = ?;
configurable string username = ?;
configurable string password = ?;

listener rabbitmq:Listener channelListener = new (host, port,
qosSettings = { prefetchCount: 10 },
auth = { username: username, password: password }
);

Listener with TLS:

import ballerinax/rabbitmq;

listener rabbitmq:Listener channelListener = new ("rabbitmq.example.com", 5671,
auth = { username: "myuser", password: "mypass" },
secureSocket = {
cert: "/path/to/ca_certificate.pem",
key: {
certFile: "/path/to/client_certificate.pem",
keyFile: "/path/to/client_key.pem"
}
}
);

Service

A rabbitmq:Service is a Ballerina service attached to a rabbitmq:Listener. It is annotated with @rabbitmq:ServiceConfig to specify the queue to consume from and acknowledgement mode. Implement onMessage for one-way consumption or onRequest for request-reply patterns.

Callback signatures

FunctionSignatureDescription
onMessageremote function onMessage(rabbitmq:AnydataMessage message, rabbitmq:Caller caller?) returns error?Invoked when a message arrives on the queue. Use for one-way message consumption.
onRequestremote function onRequest(rabbitmq:AnydataMessage message, rabbitmq:Caller caller?) returns anydataInvoked when a message arrives and a reply is expected. The return value is sent back as the response.
note

You can implement either onMessage or onRequest, not both in the same service. Use onMessage for fire-and-forget consumption and onRequest when the publisher expects a reply.

Full usage example

import ballerina/log;
import ballerinax/rabbitmq;

configurable string host = "localhost";
configurable int port = 5672;

listener rabbitmq:Listener channelListener = new (host, port);

@rabbitmq:ServiceConfig {
queueName: "OrderQueue",
autoAck: false
}
service rabbitmq:Service on channelListener {

remote function onMessage(rabbitmq:AnydataMessage message, rabbitmq:Caller caller) returns error? {
string|error content = string:fromBytes(check message.content.ensureType());
if content is string {
log:printInfo("Received order", payload = content, routingKey = message.routingKey);
}
// Acknowledge the message after successful processing
check caller->basicAck();
}
}
note

When autoAck is set to true (the default), messages are acknowledged automatically before the callback executes. Set autoAck: false and use rabbitmq:Caller for manual acknowledgement when you need to ensure messages are only acknowledged after successful processing.


Supporting types

AnydataMessage

FieldTypeDescription
contentanydataThe message payload content.
routingKeystringThe routing key the message was published with.
exchangestringThe exchange the message was published to (empty string for default exchange).
deliveryTagint?The delivery tag assigned by the broker (present on consumed messages).
propertiesBasicProperties?Optional message properties (replyTo, contentType, contentEncoding, correlationId, headers).

BasicProperties

FieldTypeDescription
replyTostring?The queue name for reply messages in request-reply patterns.
contentTypestring?MIME type of the message content.
contentEncodingstring?Encoding of the message content.
correlationIdstring?Correlation identifier for matching requests with replies.
headersmap<anydata>?Custom headers as key-value pairs.

QosSettings

FieldTypeDescription
prefetchCountintMaximum number of unacknowledged messages the server will deliver to the consumer.
prefetchSizeint?Maximum total size (in bytes) of unacknowledged messages.
globalbooleanIf true, QoS settings apply to the entire channel; otherwise per-consumer.

RabbitMQServiceConfig

FieldTypeDescription
queueNamestringThe name of the queue to consume messages from.
configQueueConfig?Optional queue configuration; the queue is declared if it does not exist.
autoAckbooleanIf true (default), messages are automatically acknowledged before the callback.