Actions
The ballerinax/rabbitmq package exposes the following clients:
| Client | Purpose |
|---|---|
Client | Publish messages, consume messages, and manage queues and exchanges on a RabbitMQ broker. |
For event-driven integration, see the Trigger Reference.
Client
Publish messages, consume messages, and manage queues and exchanges on a RabbitMQ broker.
Configuration
| Field | Type | Default | Description |
|---|---|---|---|
host | string | Required | RabbitMQ server hostname (constructor parameter). |
port | int | Required | RabbitMQ server AMQP port (constructor parameter). |
auth | Credentials | () | Username and password credentials for authentication. |
virtualHost | string | () | The virtual host to connect to. |
connectionTimeout | decimal | () | Connection timeout in seconds. |
handshakeTimeout | decimal | () | TLS handshake timeout in seconds. |
shutdownTimeout | decimal | () | Shutdown timeout in seconds. |
heartbeat | decimal | () | Heartbeat interval in seconds. |
validation | boolean | true | Enable constraint validation for messages. |
secureSocket | SecureSocket | () | TLS/SSL configuration for secure connections. |
Initializing the client
import ballerinax/rabbitmq;
configurable string host = "localhost";
configurable int port = 5672;
rabbitmq:Client rabbitmqClient = check new (host, port);
Operations
Queue management
queueDeclare
Declares a queue on the RabbitMQ server. Creates the queue if it does not exist.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
name | string | Yes | The name of the queue to declare. |
config | QueueConfig? | No | Queue configuration (durable, exclusive, autoDelete, arguments). |
Returns: Error?
Sample code:
check rabbitmqClient->queueDeclare("OrderQueue", config = {
durable: true,
autoDelete: false
});
queueAutoGenerate
Declares a queue with a server-generated unique name and returns the name.
Parameters:
| Name | Type | Required | Description |
|---|
Returns: string|error
Sample code:
string generatedQueueName = check rabbitmqClient->queueAutoGenerate();
Sample response:
"amq.gen-Xa2Kh5Q7F3mR1s0T"
queueDelete
Deletes a queue from the RabbitMQ server.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
queueName | string | Yes | The name of the queue to delete. |
ifUnused | boolean | No | Delete only if the queue has no consumers. |
ifEmpty | boolean | No | Delete only if the queue is empty. |
Returns: Error?
Sample code:
check rabbitmqClient->queueDelete("OrderQueue");
queuePurge
Removes all messages from a queue without deleting the queue itself.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
queueName | string | Yes | The name of the queue to purge. |
Returns: Error?
Sample code:
check rabbitmqClient->queuePurge("OrderQueue");
Exchange management
exchangeDeclare
Declares an exchange on the RabbitMQ server.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
name | string | Yes | The name of the exchange to declare. |
exchangeType | ExchangeType | No | The exchange type: DIRECT_EXCHANGE, FANOUT_EXCHANGE, TOPIC_EXCHANGE, or HEADERS_EXCHANGE. |
config | ExchangeConfig? | No | Exchange configuration (durable, autoDelete, arguments). |
Returns: Error?
Sample code:
check rabbitmqClient->exchangeDeclare("OrderExchange", rabbitmq:TOPIC_EXCHANGE, config = {
durable: true
});
exchangeDelete
Deletes an exchange from the RabbitMQ server.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
exchangeName | string | Yes | The name of the exchange to delete. |
Returns: Error?
Sample code:
check rabbitmqClient->exchangeDelete("OrderExchange");
queueBind
Binds a queue to an exchange with a routing/binding key.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
queueName | string | Yes | The name of the queue to bind. |
exchangeName | string | Yes | The name of the exchange to bind to. |
bindingKey | string | Yes | The binding key for the queue-exchange binding. |
Returns: Error?
Sample code:
check rabbitmqClient->queueBind("OrderQueue", "OrderExchange", "orders.#");
Publish
publishMessage
Publishes a message to a queue or exchange with the specified routing key.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
message | AnydataMessage | Yes | The message to publish, containing content, routingKey, and optionally exchange and properties. |
Returns: Error?
Sample code:
check rabbitmqClient->publishMessage({
content: "Hello from Ballerina",
routingKey: "OrderQueue"
});
Consume
consumeMessage
Synchronously retrieves a single message from a queue, including metadata.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
queueName | string | Yes | The queue to consume from. |
autoAck | boolean | No | If true, the message is automatically acknowledged. |
T | typedesc<AnydataMessage> | No | Expected message type for data binding. |
Returns: AnydataMessage|error
Sample code:
rabbitmq:AnydataMessage message = check rabbitmqClient->consumeMessage("OrderQueue");
Sample response:
{"content": "Hello from Ballerina", "routingKey": "OrderQueue", "exchange": "", "deliveryTag": 1}
consumePayload
Synchronously retrieves only the payload content from a queue message.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
queueName | string | Yes | The queue to consume from. |
autoAck | boolean | No | If true, the message is automatically acknowledged. |
T | typedesc<anydata> | No | Expected payload type for data binding. |
Returns: anydata|error
Sample code:
string payload = check rabbitmqClient->consumePayload("OrderQueue");
Sample response:
"Hello from Ballerina"
Acknowledgement
basicAck
Acknowledges one or more received messages by delivery tag or message reference.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
ackTarget | `AnydataMessage | int` | Yes |
multiple | boolean | No | If true, acknowledges all messages up to and including the given delivery tag. |
Returns: Error?
Sample code:
rabbitmq:AnydataMessage message = check rabbitmqClient->consumeMessage("OrderQueue", autoAck = false);
check rabbitmqClient->basicAck(message);
basicNack
Rejects one or more received messages, optionally requeuing them.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
ackTarget | `AnydataMessage | int` | Yes |
multiple | boolean | No | If true, rejects all messages up to and including the given delivery tag. |
requeue | boolean | No | If true, rejected messages are requeued rather than discarded. |
Returns: Error?
Sample code:
rabbitmq:AnydataMessage message = check rabbitmqClient->consumeMessage("OrderQueue", autoAck = false);
check rabbitmqClient->basicNack(message, requeue = true);
Connection lifecycle
close
Closes the RabbitMQ client channel and connection gracefully.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
closeCode | int? | No | The close code to send to the server. |
closeMessage | string? | No | The close message to send to the server. |
Returns: Error?
Sample code:
check rabbitmqClient->close();
abort
Force-closes the client connection, discarding any pending operations or exceptions.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
closeCode | int? | No | The close code. |
closeMessage | string? | No | The close message. |
Returns: Error?
Sample code:
check rabbitmqClient->'abort();