Skip to main content

Streaming Capabilities

WSO2 Integrator provides a range of streaming capabilities for building event-driven integrations, processing large datasets efficiently, and implementing real-time communication patterns. These capabilities enable integrations to continuously consume, process, transform, and deliver data as it becomes available, making them suitable for use cases such as event-driven architectures, change data capture (CDC), messaging systems, large-file processing, and streaming APIs.

Unlike traditional request-response integrations that operate on complete datasets or messages, streaming integrations process data incrementally. This approach helps reduce memory consumption, improve responsiveness, and handle continuous or high-volume data flows more efficiently. Whether consuming events from a message broker, processing large database query results, transferring large files, or maintaining long-lived protocol connections, WSO2 Integrator provides a consistent programming model for building streaming-based integrations.

note

This guide covers the streaming capabilities of the WSO2 Integrator: Default profile. Advanced stream analytics features are provided by the WSO2 Integrator: SI profile.

When do you need the SI profile?

The default profile handles most streaming integration needs. You may need the WSO2 Integrator: SI profile if your use case involves any of the following.

  • Event windowing — computing a metric over a rolling or fixed time window on a live stream, updated continuously as events arrive
    • Computing the total number of orders in the last 5 minutes
    • Computing an average sensor reading per session
  • Stream joins — correlating events from two separate live event streams based on a shared key and time window
    • Matching a shipment scan with its corresponding order event when both must arrive within a short time window
  • Complex Event Processing (CEP) — detecting when a specific sequence of events occurs, or when an expected event does not arrive within a time limit
    • Triggering a fraud alert when a login is followed by a large transfer within 2 minutes
    • Raising an alert if a machine heartbeat stops for 2 minutes
  • Pattern detection — identifying recurring patterns or trends across a sequence of consecutive events
    • Alerting when three consecutive production-batch outputs are declining
    • Alerting when a sensor reading exceeds a threshold, for five readings in a row
  • Incremental aggregations — automatically maintaining pre-computed summaries at multiple time granularities simultaneously, so they can be queried later
    • Maintaining per-minute, per-hour, and per-day sales totals so a dashboard can query any resolution without reprocessing raw events

Building blocks of stream processing

This section outlines the foundational components used to implement streaming integrations within the WSO2 Integrator: Default profile.

Streams

The stream<T, E?> type is the unifying primitive that the other in-process streaming capabilities (database query results, data-format parsers, file I/O) rely on. A stream is a lazy, ordered sequence of values of type T that may either complete normally or terminate with an error of type E. Streams can be bounded (e.g., a file's lines) or unbounded (e.g., a generator that produces values on demand). They are pulled one element at a time, so the full sequence is never materialized in memory.

import ballerina/io;

// From an array
int[] numbers = [1, 2, 3, 4, 5];
stream<int> intStream = numbers.toStream();

// From a file as a block stream (each block is a byte[])
stream<io:Block, io:Error?> blockStream =
check io:fileReadBlocksAsStream("large-input.bin");

// From a CSV file as records
type Order record {| string id; decimal amount; |};
stream<Order, io:Error?> orders =
check io:fileReadCsvAsStream("orders.csv");

Transforming with query expressions

Query expressions (from … where … select) work directly on streams, producing a new stream without buffering the input.

stream<Order, io:Error?> highValue = from Order o in orders
where o.amount > 1000.0d
select o;

Aggregating with collect

decimal total = check from var {amount} in orders
collect sum(amount);

Side effects with query actions

Use do { … } instead of select to run actions per element:

check from Order o in orders
where o.amount > 1000.0d
do {
check publishHighValueOrder(o);
};

Closing streams and error handling

For streams that hold external resources (database cursors, file handles, sockets), call close() when finished. The E? parameter on stream<T, E?> propagates errors that occur during iteration.

stream<Order, io:Error?> orders = check io:fileReadCsvAsStream("orders.csv");
do {
check from Order o in orders do {
check processOrder(o);
};
} on fail error e {
log:printError("stream terminated", 'error = e);
}
check orders.close();

For the full query clause reference, see Query Expressions.

Service/listener model

Event-driven integrations are built around two building blocks: listeners that own the protocol concerns (connection, subscription, acknowledgement, retries, partition rebalance) and services that hold the business logic. This separation means streaming concerns like backpressure and delivery guarantees are handled by the listener — the service just receives events.

import ballerinax/kafka;

listener kafka:Listener orderListener = check new (kafka:DEFAULT_URL, {
groupId: "order-processors",
topics: ["orders"]
});

service "OrderConsumer" on orderListener {
remote function onConsumerRecord(Order[] orders) returns error? {
foreach Order o in orders {
check processOrder(o);
}
}
}

Listeners exist for every broker, every event source, and every streaming network protocol covered below. See the Build an Event-Driven Integration quick start for a complete walkthrough.

Streaming capability categories

CategoryWhat it coversConnector
Database query streamingResult sets returned as lazy streams of recordsMySQL, PostgreSQL, Microsoft SQL Server (MSSQL), OracleDB, Snowflake, JDBC
CSV streamingStreaming CSV parser that yields one record at a timeCSV
Message brokersDistributed event streaming and messagingKafka, RabbitMQ, MQTT, NATS, JMS, ASB, Solace, AWS SQS, AWS SNS
Change Data Capture and SaaS event sourcesDatabase change events, cloud/SaaS eventsCDC, Salesforce, Email, DynamoDB Streams, GitHub Trigger
File / object transfer streamingMemory-efficient streaming over file transfer protocolsI/O, FTP, SMB
Streaming network protocolsLong-lived bidirectional / server-streamed connectionsWebSocket, gRPC, HTTP (SSE), GraphQL (subscriptions), UDP

Database query streaming (SQL)

Database-specific connectors return result sets as stream<record {}, sql:Error?> from the query() method. This lets you process arbitrarily large result sets without loading them into memory.

Iterating a query stream

import ballerina/sql;
import ballerinax/mysql;

type Customer record {| int id; string name; string email; |};

mysql:Client db = check new ("localhost", "user", "pass", "shop", 3306);

public function main() returns error? {
stream<Customer, sql:Error?> customers =
db->query(`SELECT id, name, email FROM customers`);

check from Customer c in customers
do {
check sendWelcomeEmail(c);
};
}

Composing with query expressions

A SQL result stream is just a stream<...>, so it composes naturally with the language-level query syntax:

stream<Order, sql:Error?> orderStream =
db->query(`SELECT id, amount, status FROM orders WHERE created_at > ${cutoff}`);

decimal totalRevenue = check from var {status, amount} in orderStream
where status == "paid"
collect sum(amount);

query() vs. queryRow()

Use query() (returns a stream) for result sets that may have many rows. Use queryRow() (returns a single record) when you expect exactly one row — for example, a primary-key lookup.

For per-database actions and configuration, see the connector docs: MySQL, PostgreSQL, MSSQL, Oracle Database.

CSV streaming

For very large CSV files, CSV connector exposes parseToStream, which incrementally parses a byte block stream and yields one record at a time. This keeps memory usage bounded regardless of file size, and the resulting record stream composes naturally with query expressions.

import ballerina/data.csv;
import ballerina/io;

type Order record {| string id; decimal amount; string currency; |};

public function main() returns error? {
stream<byte[], io:Error?> csvBytes = check io:fileReadBlocksAsStream("orders.csv");

stream<Order, csv:Error?> orders = check csv:parseToStream(csvBytes);

check from Order o in orders
where o.currency == "USD"
do {
check processOrder(o);
};
}

csv:parseStream (without the To) consumes a byte block stream too, but returns the full result as an array — it is not memory-efficient for large files. Use parseToStream when you need true record-level streaming. See CSV for the format reference.

Message brokers

Each broker has a producer/client and a listener. The listener-driven services described in Service/listener model apply uniformly.

ConnectorBrokerDelivery semanticsCapabilities
KafkaApache KafkaAt-least-once, exactly-once (transactions)Consumer groups, SASL/SSL, Avro, GraalVM-compatible
RabbitMQRabbitMQ (AMQP 0-9-1)At-least-onceDirect/Fanout/Topic/Headers exchanges, client ack
MQTTMQTT brokersQoS 0/1/2Last-will, retained messages, IoT-oriented
NATSNATS / JetStreamAt-most-once (core) / at-least-once (JetStream)Publish-subscribe, request-reply, and load-balanced queues, JetStream for persistent messaging
JMSJMS providers (ActiveMQ, Artemis)At-least-onceQueues, topics, durable subscribers
ASBAzure Service BusAt-least-onceSessions, dead-lettering, batch operations
SolaceSolace PubSub+At-least-oncePub/Sub, request/reply, queuing modes
AWS SQSAmazon SQSAt-least-once (FIFO: exactly-once)Standard and FIFO queues
AWS SNSAmazon SNSTopic fan-outPub/sub with topic subscriptions

Kafka example

import ballerinax/kafka;

public function main() returns error? {
// Producer
kafka:Producer producer = check new (kafka:DEFAULT_URL);
check producer->send({ topic: "orders", value: { id: "O-1", amount: 99.99 }});
}

// Consumer
listener kafka:Listener kafkaListener = check new (kafka:DEFAULT_URL, {
groupId: "order-processors",
topics: ["orders"]
});

service "Orders" on kafkaListener {
remote function onConsumerRecord(Order[] records) returns error? {
foreach Order o in records {
check processOrder(o);
}
}
}

RabbitMQ example

import ballerinax/rabbitmq;

public function main() returns error? {
// Producer
rabbitmq:Client rmqClient = check new (rabbitmq:DEFAULT_HOST, rabbitmq:DEFAULT_PORT);
check rmqClient->publishMessage({
routingKey: "order.created",
exchange: "orders-exchange",
content: { id: "O-1", amount: 99.99 }
});
}

// Consumer
listener rabbitmq:Listener rmqListener = check new (rabbitmq:DEFAULT_HOST, rabbitmq:DEFAULT_PORT);

service "orders" on rmqListener {
remote function onMessage(Order o) returns error? {
check processOrder(o);
}
}

For per-broker artifact pages with the full creation workflow, see the Event-Driven Integration section.

Change Data Capture and SaaS event sources

Beyond message brokers, the default profile exposes streams of events from databases and SaaS systems as first-class listeners.

ConnectorSourceWhat it emits
CDC, Microsoft SQL Server (MSSQL)MSSQL CDCINSERT/UPDATE/DELETE events from a SQL Server table
CDC, PostgreSQLPostgreSQL CDCLogical replication change events
CDC, MySQLMySQL CDCBinlog-based change events
DynamoDB StreamsAWS DynamoDB StreamsItem-level changes from a DynamoDB table
SalesforceSalesforce eventsPlatform events, change data capture events
GitHub TriggerGitHub WebhooksRepository, PR, issue events
EmailPOP3 / IMAP4New email messages
Twilio TriggerTwilioSMS, voice events

CDC example (PostgreSQL)

import ballerinax/cdc;
import ballerinax/postgresql;
import ballerinax/postgresql.cdc.driver as _;

listener postgresql:CdcListener postgresqlCdcListener = new (database = {
hostname: "localhost",
port: 5432,
username: "sa",
password: "password",
databaseName: "shop"
});

@cdc:ServiceConfig {
tables: "shop.dbo.orders"
}
service cdc:Service on postgresqlCdcListener {
remote function onCreate(Order afterEntry, string tableName) returns error? {
}

remote function onUpdate(DatabaseEntrySchema beforeEntry, DatabaseEntrySchema afterEntry, string tableName) returns error? {
}

remote function onDelete(Order beforeEntry, string tableName) returns error? {
}
}

For per-source guides, see the artifact pages: CDC for PostgreSQL, CDC for MSSQL, Salesforce events, GitHub webhooks, POP3/IMAP4.

File / object transfer streaming

For workflows involving large files, the default profile exposes file contents as streams of byte blocks (or typed records) so files can be read, transformed, and written without buffering in memory.

ConnectorStreaming surface
I/OfileReadBlocksAsStream, fileReadCsvAsStream, fileWriteBlocksFromStream for local files
FTPftp:Client->getBytesAsStream / put over FTP and SFTP; ftp:Listener for file-arrival events
SMBsmb:Client->getBytesAsStream / put over SMB; smb:Listener for file-arrival events

Download from FTP as a byte stream

ftp:Client->getBytesAsStream reads a remote file as a stream<byte[], io:Error?>, letting you process the file block by block.

import ballerina/ftp;

ftp:Client ftpClient = check new ({
host: "ftp.example.com",
auth: { credentials: { username: "user", password: "***" } }
});

public function main() returns error? {
stream<byte[], error?> byteStream = check ftpClient->getBytesAsStream("/exports/orders.csv");
record {| byte[] value; |}? nextBytes = check byteStream.next();
while nextBytes is record {| byte[] value; |} {
check processBlock(nextBytes.value);
nextBytes = check byteStream.next();
}
check byteStream.close();
}

Download from an SMB share as a byte stream

SMB connector mirrors the FTP client surface: smb:Client->getBytesAsStream returns a byte block stream from a remote SMB share.

import ballerina/smb;

smb:Client smbClient = check new ({
host: "fileserver.local",
auth: { credentials: { username: "user", password: "***" } },
share: "Shared"
});

public function main() returns error? {
stream<byte[], error?> byteStream = check smbClient->getBytesAsStream("/share/exports/orders.csv");
record {| byte[] value; |}? nextBytes = check byteStream.next();
while nextBytes is record {| byte[] value; |} {
check processBlock(nextBytes.value);
nextBytes = check byteStream.next();
}
check byteStream.close();
}

For file-arrival events, both modules also expose a Listener. For the file-streaming deep dive, see Streaming Large Files and FTP / SFTP.

Streaming network protocols

Some protocols are inherently streaming: the connection itself is long-lived, and data flows as a continuous sequence of frames or messages.

ModuleProtocolStreaming model
WebSocketWebSocketFull-duplex text/binary frames over a single connection
gRPCgRPCUnary, server-streaming, client-streaming, and bidirectional streaming RPCs
HTTP (SSE)HTTP (SSE)Server-to-client push as a stream<http:SseEvent, error?> over a long-lived HTTP connection
GraphQLGraphQL subscriptionsA subscribe resolver returns a stream<T, error?>, typically carried over WebSocket
UDPUDPConnectionless datagram send/receive for high-throughput unordered streams

WebSocket and gRPC use a listener/frame model, while HTTP SSE and GraphQL subscriptions surface a literal stream<...> value, tying directly back to streams.

WebSocket example

import ballerina/websocket;

service /chat on new websocket:Listener(9090) {
resource function get .() returns websocket:Service {
return new ChatService();
}
}

service class ChatService {
*websocket:Service;
remote function onTextMessage(websocket:Caller caller, string msg) returns error? {
check caller->writeTextMessage("echo: " + msg);
}
}

gRPC server-streaming example

import ballerina/grpc;

@grpc:Descriptor {
value: GRPC_SERVER_STREAMING_DESC
}
service "HelloWorld" on new grpc:Listener(9090) {
remote function lotsOfReplies(string name) returns stream<string, error?> {
string[] greets = ["Hi", "Hey", "GM"];
int i = 0;
foreach string greet in greets {
greets[i] = greet + " " + name;
i += 1;
}
return greets.toStream();
}
}

HTTP server-sent events (SSE) example

A resource function returns a stream<http:SseEvent, error?>, and the client consumes the same type — pushing events to the client without polling.

import ballerina/http;

// Service: push a stream of events to the client
service /stocks on new http:Listener(9090) {
resource function get .() returns stream<http:SseEvent, error?> {
return new stream<http:SseEvent, error?>(new StockPriceGenerator());
}
}
import ballerina/http;
import ballerina/io;

// Client: consume the event stream
http:Client stocksClient = check new ("http://localhost:9090");

public function main() returns error? {
stream<http:SseEvent, error?> eventStream = check stocksClient->/stocks;
check from http:SseEvent event in eventStream
do {
io:println("Stock price: ", event.data);
};
}

GraphQL subscription example

A subscribe resolver returns a stream<T, error?>; each value the stream yields is delivered to subscribed clients (over WebSocket).

import ballerina/graphql;

service /graphql on new graphql:Listener(9000) {
resource function subscribe names() returns stream<string, error?> {
return ["Walter", "Skyler", "Jesse"].toStream();
}
}

See Supported Protocols for the full protocol matrix.

See also