Skip to main content

Triggers

The ballerinax/kafka connector supports event-driven message consumption through a kafka:Listener that continuously polls Kafka topics and dispatches batches of records to your kafka:Service callback, eliminating the need for manual poll loops.

Three components work together:

ComponentRole
kafka:ListenerContinuously polls Kafka topics at a configurable interval and dispatches records to attached services.
kafka:ServiceDefines the onConsumerRecord callback invoked for each batch of consumed records.
kafka:CallerProvided in the callback to enable manual offset commits and seeking within the service handler.

For action-based record operations, see the Action Reference.


Listener

The kafka:Listener establishes the connection and manages event subscriptions.

Configuration

The listener supports the following connection strategies:

Config TypeDescription
ConsumerConfigurationThe Listener reuses the same ConsumerConfiguration as the Consumer client. Key fields for listener usage are shown below.

ConsumerConfiguration fields:

FieldTypeDefaultDescription
groupIdstring?()Consumer group identifier for coordinated consumption.
topics`stringstring[]?`()
offsetResetOffsetResetMethod?()Strategy when no initial offset exists ("earliest", "latest", "none").
pollingIntervaldecimal?()Interval in seconds between consecutive polls.
pollingTimeoutdecimal?()Timeout in seconds for each poll call.
autoCommitbooleantrueAutomatically commit offsets. Set to false for manual offset control via kafka:Caller.
concurrentConsumersint?()Number of concurrent consumers for parallel processing.
decoupleProcessingbooleanfalseDecouple record processing from polling for improved throughput.
secureSocketSecureSocket?()SSL/TLS configuration.
authAuthenticationConfiguration?()SASL authentication configuration.
securityProtocolSecurityProtocolPROTOCOL_PLAINTEXTSecurity protocol.

Initializing the listener

Basic listener with auto-commit disabled:

import ballerinax/kafka;

listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, {
groupId: "my-group",
topics: ["test-kafka-topic"],
pollingInterval: 1,
autoCommit: false
});

Listener with SASL authentication:

import ballerinax/kafka;

configurable string username = ?;
configurable string password = ?;
configurable string bootstrapServers = ?;

listener kafka:Listener kafkaListener = new (bootstrapServers, {
groupId: "secure-group",
topics: ["secure-topic"],
auth: {
mechanism: kafka:AUTH_SASL_PLAIN,
username: username,
password: password
},
securityProtocol: kafka:PROTOCOL_SASL_PLAINTEXT
});

Service

A kafka:Service is a Ballerina service attached to a kafka:Listener. It implements the onConsumerRecord callback which is invoked each time the listener polls a batch of records from the subscribed Kafka topic(s).

Callback signatures

FunctionSignatureDescription
onConsumerRecordremote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) returns error?Standard form: caller first, records second.
onConsumerRecordremote function onConsumerRecord(kafka:BytesConsumerRecord[] records, kafka:Caller caller) returns error?Parameter order can be reversed: records first, caller second.
onConsumerRecordremote function onConsumerRecord(kafka:BytesConsumerRecord[] records) returns error?kafka:Caller is optional. Omit it when manual offset management is not needed.
note

The records array type can be replaced with any typed Ballerina record (T[]) for automatic payload deserialization. The readonly modifier can also be applied to the records parameter (e.g., readonly & T[] records).

note

Use the @kafka:Payload annotation on the records parameter to explicitly mark it as the payload for data binding, particularly when the service method includes both a kafka:Caller and a typed record parameter:

remote function onConsumerRecord(kafka:Caller caller, @kafka:Payload MyRecord[] records) returns error?
note

The kafka:Caller provides commit(), commitOffset(), and seek() remote functions for manual offset management within the service callback.

Full usage example

import ballerina/log;
import ballerinax/kafka;

listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, {
groupId: "order-group",
topics: ["order-topic"],
pollingInterval: 1,
autoCommit: false
});

service on kafkaListener {
remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) returns error? {
foreach var rec in records {
string value = check string:fromBytes(rec.value);
log:printInfo("Received message", offset = rec.offset, value = value);
}
// Manually commit offsets after processing
check caller->commit();
}
}
note

Set autoCommit: false in the listener configuration when using manual offset commits via the Caller to avoid duplicate processing.


Supporting types

BytesConsumerRecord

FieldTypeDescription
keybyte[]?Optional message key as a byte array.
valuebyte[]Message payload as a byte array.
timestampintRecord timestamp in epoch milliseconds.
offsetPartitionOffsetThe partition and offset of this record.
headers`map<byte[]byte[][]>`

AnydataConsumerRecord

FieldTypeDescription
keyanydata?Optional message key.
valueanydataMessage payload.
timestampintRecord timestamp in epoch milliseconds.
offsetPartitionOffsetThe partition and offset of this record.
headers`map<byte[]byte[][]

TopicPartition

FieldTypeDescription
topicstringThe Kafka topic name.
partitionintThe partition number.

PartitionOffset

FieldTypeDescription
partitionTopicPartitionThe topic-partition this offset belongs to.
offsetintThe offset position within the partition.