Skip to main content

Concurrency

Ballerina provides built-in concurrency constructs designed for integration workloads. Rather than relying on low-level thread primitives, the language uses workers, strands, and structured concurrency patterns that make parallel execution, synchronization, and transactional flows safe and expressive.

Workers

Workers are named concurrent execution units within a function. Each worker runs on its own strand and can execute independently alongside other workers and the default worker.

function fetchData() returns error? {
// The default worker
string urlA = "https://api.example.com/a";

worker workerA returns json|error {
http:Client clientA = check new (urlA);
json resp = check clientA->get("/data");
return resp;
}

worker workerB returns json|error {
http:Client clientB = check new ("https://api.example.com/b");
json resp = check clientB->get("/data");
return resp;
}

// Wait for both workers to complete
json|error resultA = wait workerA;
json|error resultB = wait workerB;
}

Worker message passing

Workers communicate via asynchronous message channels using the -> (send) and <- (receive) operators.

function pipeline() {
worker producer {
foreach int i in 1 ... 5 {
i -> consumer;
}
}

worker consumer {
foreach int i in 1 ... 5 {
int value = <- producer;
io:println("Received: ", value);
}
}
}

Worker Send/Receive operators

OperatorSyntaxDescription
Sendexpression -> workerNameSend a value to the named worker
Sync sendexpression ->> workerNameSend and block until the receiver accepts
Receive<- workerNameReceive a value from the named worker
Multiple receive<- {workerA, workerB}Receive from multiple workers as a record
Alternate receive<- workerA | workerBReceive from whichever worker completes first
Flushflush workerNameWait until all sends to the named worker are accepted

Strands

A strand is a lightweight logical thread of execution. Every worker runs on a strand. Strands within the same isolation group share the same thread, providing cooperative concurrency without data races.

// Start a function on a new strand
future<int> f = start computeValue(100);
int result = check wait f;

Strand scheduling

ConceptDescription
StrandLightweight logical thread; the unit of concurrent execution
Default strandThe implicit strand in every function
Named worker strandEach named worker gets its own strand
start expressionCreates a new strand for an async function call
Yield pointStrands yield at I/O operations and explicit wait calls

Wait expressions

Wait expressions block the current strand until one or more asynchronous results are available.

function parallelRequests() returns error? {
// Start parallel calls
future<json> f1 = start callServiceA();
future<json> f2 = start callServiceB();
future<json> f3 = start callServiceC();

// Wait for all — result is a record
record {json f1; json f2; json f3} results = wait {f1, f2, f3};

// Wait for any one — alternate wait
json firstResult = wait f1|f2|f3;
}

Wait expression types

FormSyntaxReturns
Single waitwait futureExprThe resolved value or error
Multiple waitwait {f1, f2, ...}A record containing all results
Alternate waitwait f1 | f2 | ...The first completed result

Fork/Join

The fork statement creates a block in which multiple workers execute concurrently. After the fork block, you join the results using wait.

function aggregateData() returns json|error {
fork {
worker fetchOrders returns json|error {
http:Client cl = check new ("https://orders.example.com");
return check cl->get("/recent");
}

worker fetchInventory returns json|error {
http:Client cl = check new ("https://inventory.example.com");
return check cl->get("/status");
}

worker fetchShipping returns json|error {
http:Client cl = check new ("https://shipping.example.com");
return check cl->get("/tracking");
}
}

// Join all results
record {
json|error fetchOrders;
json|error fetchInventory;
json|error fetchShipping;
} results = wait {fetchOrders, fetchInventory, fetchShipping};

json orders = check results.fetchOrders;
json inventory = check results.fetchInventory;
json shipping = check results.fetchShipping;

return {orders, inventory, shipping};
}

Locks

The lock statement provides mutual exclusion for accessing shared mutable state. All variables accessed inside a lock block are protected from concurrent modification.

int counter = 0;

function incrementCounter() {
lock {
counter += 1;
}
}

function getCounter() returns int {
lock {
return counter;
}
}

Lock semantics

AspectBehavior
ScopeModule-level or object-level mutable variables
GranularityAll lock blocks in the same scope share the same implicit lock
NestingLock blocks can be nested; deadlock-free by design
Isolationisolated functions and objects enforce lock discipline at compile time

Isolated functions and objects

Ballerina's isolated qualifier enforces that shared state is always accessed within lock blocks at compile time.

isolated int requestCount = 0;

isolated function recordRequest() {
lock {
requestCount += 1;
}
}

isolated class Counter {
private int value = 0;

isolated function increment() {
lock {
self.value += 1;
}
}

isolated function get() returns int {
lock {
return self.value;
}
}
}

Transactions

Ballerina has first-class transaction support with transaction, commit, rollback, and retry statements.

function transferFunds(string fromAcc, string toAcc, decimal amount) returns error? {
transaction {
check debit(fromAcc, amount);
check credit(toAcc, amount);
check commit;
}
}

Transaction with rollback and on-fail

function processOrder(Order order) returns error? {
transaction {
check reserveInventory(order.items);
check chargePayment(order.payment);
check createShipment(order.address);

var result = commit;
if result is error {
log:printError("Commit failed", result);
}
} on fail error e {
log:printError("Transaction failed, rolling back", e);
// Automatic rollback occurs
}
}

Retry transactions

function reliableUpdate(string id, json data) returns error? {
retry transaction {
check updateExternalService(id, data);
check commit;
}
}

// Retry with custom policy
function retryWithBackoff(string id, json data) returns error? {
retry<retryManager:ExponentialBackoff> (maxRetries = 3, initialDelay = 1) transaction {
check updateExternalService(id, data);
check commit;
}
}

Transaction statements reference

StatementDescription
transaction { }Start a transaction block
commitCommit the current transaction; returns error?
rollbackExplicitly roll back the current transaction
retry transaction { }Retry the transaction on failure using the default retry policy
retry<T>(...) transaction { }Retry with a custom retry manager type and parameters
transactionalQualifier for functions that participate in the enclosing transaction

Transactional functions

Functions marked transactional execute within the calling transaction context.

transactional function debit(string account, decimal amount) returns error? {
sql:ExecutionResult _ = check dbClient->execute(
`UPDATE accounts SET balance = balance - ${amount} WHERE id = ${account}`
);
}

transactional function credit(string account, decimal amount) returns error? {
sql:ExecutionResult _ = check dbClient->execute(
`UPDATE accounts SET balance = balance + ${amount} WHERE id = ${account}`
);
}

Concurrency patterns for integration

Scatter-Gather

Call multiple services in parallel and aggregate results.

function scatterGather() returns json|error {
fork {
worker svcA returns json|error {
return check httpA->get("/data");
}
worker svcB returns json|error {
return check httpB->get("/data");
}
}

record {json|error svcA; json|error svcB} results = wait {svcA, svcB};
return {
a: check results.svcA,
b: check results.svcB
};
}

Competing consumers

Process messages from a queue using multiple workers.

function startConsumers(kafka:Consumer consumer) returns error? {
foreach int i in 0 ..< 4 {
_ = start processMessages(consumer, i);
}
}

function processMessages(kafka:Consumer consumer, int workerId) returns error? {
while true {
kafka:ConsumerRecord[] records = check consumer->poll(1);
foreach var rec in records {
// Process each record
check handleRecord(rec, workerId);
}
}
}

See also