Skip to main content

Actions

The ballerinax/nats package exposes the following clients:

ClientPurpose
ClientConnects to a NATS server to publish messages and perform request-reply exchanges.
JetStreamClientManages JetStream streams and publishes or pull-consumes persistent messages.

Client

Connects to a NATS server to publish messages and perform request-reply exchanges.

Configuration

FieldTypeDefaultDescription
url`stringstring[]`Required
connectionNamestring"ballerina-nats"A human-readable name for this connection, useful for server-side monitoring.
auth`CredentialsTokens()`
retryConfigRetryConfig?()Reconnection retry settings including maxReconnect, reconnectWait, and connectionTimeout.
pingPing?()Ping interval configuration with pingInterval (seconds) and maxPingsOut fields.
secureSocketSecureSocket?()TLS/SSL configuration with CA certificate, client certificate, and key paths.
inboxPrefixstring"_INBOX."Prefix used for reply-to subjects in request/reply messaging.
noEchobooleanfalseWhen true, the server will not echo messages back to the connection that published them.
validationbooleantrueEnables payload validation on message receive.

Initializing the client

import ballerinax/nats;

configurable string natsUrl = "nats://localhost:4222";
configurable string username = ?;
configurable string password = ?;

nats:Client natsClient = check new (natsUrl,
auth = {
username: username,
password: password
},
connectionName = "my-ballerina-client"
);

Operations

Publish

publishMessage

Publishes a message to a NATS subject. This is a fire-and-forget operation; no acknowledgement is returned by the broker.

Parameters:

NameTypeRequiredDescription
messageAnydataMessageYesThe message to publish. Must include a subject and content (any Ballerina anydata value). Optionally set replyTo to specify a reply subject.

Returns: Error?

Sample code:

import ballerina/io;
import ballerinax/nats;

public function main() returns error? {
nats:Client natsClient = check new (nats:DEFAULT_URL);

check natsClient->publishMessage({
content: "Hello, NATS!".toBytes(),
subject: "demo.greet"
});

io:println("Message published successfully.");
check natsClient->close();
}

Request-Reply

requestMessage

Sends a request message to a subject and waits for a single reply, implementing the request-reply messaging pattern. The caller blocks until a reply is received or the timeout expires.

Parameters:

NameTypeRequiredDescription
messageAnydataMessageYesThe request message with a subject and content. The replyTo field is automatically set by the client.
durationdecimal?NoTimeout in seconds to wait for a reply. If (), waits indefinitely.
Ttypedesc<AnydataMessage>NoThe expected type of the returned message. Defaults to nats:AnydataMessage.

Returns: T|Error

Sample code:

import ballerina/io;
import ballerinax/nats;

public function main() returns error? {
nats:Client natsClient = check new (nats:DEFAULT_URL);

nats:AnydataMessage reply = check natsClient->requestMessage(
{content: "ping".toBytes(), subject: "demo.echo"},
5.0 // timeout: 5 seconds
);

string replyContent = check string:fromBytes(<byte[]>reply.content);
io:println("Reply received: ", replyContent);
check natsClient->close();
}

Sample response:

{"content": [112, 111, 110, 103], "subject": "_INBOX.abc123", "replyTo": ()}

Connection management

close

Closes the connection to the NATS server and releases all associated resources.

Parameters:

NameTypeRequiredDescription

Returns: Error?

Sample code:

check natsClient->close();

JetStreamClient

Manages JetStream streams and publishes or pull-consumes persistent messages.

Configuration

FieldTypeDefaultDescription
natsClientnats:ClientRequiredAn already-initialized nats:Client instance. The JetStream client reuses the underlying NATS connection.
namestring?()Stream name. Must be unique within the JetStream account.
subjects`stringstring[]?`()
storageTypeStorageType?FILEStorage backend: FILE for disk persistence or MEMORY for in-memory storage.
retentionPolicyRetentionPolicy?LIMITSRetention policy: LIMITS (size/age-based), INTEREST (consumer-based), or WORKQUEUE (consume-once).
maxMsgsfloat?()Maximum number of messages retained in the stream.
maxBytesfloat?()Maximum total bytes retained in the stream.
maxAgedecimal?()Maximum age of messages in nanoseconds before they are removed.
replicasint?1Number of replicas for the stream in a clustered deployment.

Initializing the client

import ballerinax/nats;

configurable string natsUrl = "nats://localhost:4222";

// First create the core NATS client
nats:Client natsClient = check new (natsUrl);

// Then create the JetStream client from the core client
nats:JetStreamClient jsClient = check new (natsClient);

// Create a stream before publishing
check jsClient->addStream({
name: "ORDERS",
subjects: "orders.*",
storageType: nats:FILE,
retentionPolicy: nats:LIMITS
});

Operations

Stream management

addStream

Creates a new JetStream stream with the specified configuration.

Parameters:

NameTypeRequiredDescription
streamConfigStreamConfigurationYesConfiguration for the new stream including name, subjects, storage type, and retention policy.

Returns: Error?

Sample code:

check jsClient->addStream({
name: "EVENTS",
subjects: ["events.>"],
storageType: nats:FILE,
maxMsgs: 1000000.0,
maxAge: 86400000000000.0 // 24 hours in nanoseconds
});
updateStream

Updates the configuration of an existing JetStream stream.

Parameters:

NameTypeRequiredDescription
streamConfigStreamConfigurationYesUpdated stream configuration. The name field must match an existing stream.

Returns: Error?

Sample code:

check jsClient->updateStream({
name: "ORDERS",
subjects: "orders.*",
maxMsgs: 500000.0
});
deleteStream

Permanently deletes a JetStream stream and all its messages.

Parameters:

NameTypeRequiredDescription
streamNamestringYesThe name of the stream to delete.

Returns: Error?

Sample code:

check jsClient->deleteStream("ORDERS");
purgeStream

Removes all messages from a JetStream stream without deleting the stream itself.

Parameters:

NameTypeRequiredDescription
streamNamestringYesThe name of the stream to purge.

Returns: Error?

Sample code:

check jsClient->purgeStream("ORDERS");

Persistent messaging

publishMessage

Publishes a message to a JetStream subject. The message is persisted according to the stream's retention policy.

Parameters:

NameTypeRequiredDescription
messageJetStreamMessageYesThe message to publish, with subject and content (byte array) fields.

Returns: Error?

Sample code:

import ballerinax/nats;

public function main() returns error? {
nats:Client natsClient = check new (nats:DEFAULT_URL);
nats:JetStreamClient jsClient = check new (natsClient);

check jsClient->publishMessage({
subject: "orders.new",
content: "{\"orderId\": \"ORD-001\", \"amount\": 99.99}".toBytes()
});
}
consumeMessage

Pull-fetches a single message from a JetStream subject. Blocks until a message is available or the timeout expires.

Parameters:

NameTypeRequiredDescription
subjectstringYesThe JetStream subject to consume from.
timeoutdecimalYesMaximum time in seconds to wait for a message.

Returns: JetStreamMessage|Error

Sample code:

nats:JetStreamMessage msg = check jsClient->consumeMessage("orders.new", 5.0);
string payload = check string:fromBytes(msg.content);
io:println("Consumed order: ", payload);

// Acknowledge the message
jsClient->ack(msg);

Sample response:

{"subject": "orders.new", "content": [123, 34, 111, 114, 100, 101, 114, 73, 100, 34, 58, 32, 34, 79, 82, 68, 45, 48, 48, 49, 34, 125]}

Message acknowledgement

ack

Acknowledges a JetStream message, signalling that it has been successfully processed. The server will not redeliver it.

Parameters:

NameTypeRequiredDescription
messageJetStreamMessageYesThe message to acknowledge.

Returns: ()

Sample code:

nats:JetStreamMessage msg = check jsClient->consumeMessage("orders.new", 5.0);
// Process the message...
jsClient->ack(msg);
nak

Negatively acknowledges a JetStream message, indicating processing failure. The server will redeliver the message.

Parameters:

NameTypeRequiredDescription
messageJetStreamMessageYesThe message to negatively acknowledge.

Returns: ()

Sample code:

nats:JetStreamMessage msg = check jsClient->consumeMessage("orders.new", 5.0);
// Processing failed: redeliver
jsClient->nak(msg);
inProgress

Informs the server that the message is still being processed, resetting the redelivery timer.

Parameters:

NameTypeRequiredDescription
messageJetStreamMessageYesThe message currently being processed.

Returns: ()

Sample code:

nats:JetStreamMessage msg = check jsClient->consumeMessage("orders.new", 30.0);
// Signal work in progress before a slow operation
jsClient->inProgress(msg);
// ... long-running processing ...
jsClient->ack(msg);