Actions
The ballerinax/kafka package exposes the following clients:
| Client | Purpose |
|---|---|
Producer | Publishes messages to Kafka topics with configurable serialization, compression, and delivery guarantees. |
Consumer | Subscribes to Kafka topics and polls for messages with manual offset management. |
For event-driven integration, see the Trigger Reference.
Producer
Publishes messages to Kafka topics with configurable serialization, compression, and delivery guarantees.
Configuration
| Field | Type | Default | Description |
|---|---|---|---|
acks | ProducerAcks | ACKS_SINGLE | Number of acknowledgments the producer requires ("all", "0", or "1"). |
compressionType | CompressionType | COMPRESSION_NONE | Compression algorithm for messages ("none", "gzip", "snappy", "lz4", "zstd"). |
clientId | string? | () | Identifier sent to the broker for logging and monitoring. |
keySerializerType | SerializerType | SER_BYTE_ARRAY | Serializer for message keys. |
valueSerializerType | SerializerType | SER_BYTE_ARRAY | Serializer for message values. |
transactionalId | string? | () | Transactional ID for exactly-once delivery. |
enableIdempotence | boolean | false | Enable idempotent producer for exactly-once semantics. |
retryCount | int? | () | Number of retries for failed send attempts. |
batchSize | int? | () | Maximum number of bytes to batch before sending. |
linger | decimal? | () | Time in seconds to wait for additional messages before sending a batch. |
requestTimeout | decimal? | () | Time in seconds to wait for a response from the broker. |
schemaRegistryUrl | string? | () | URL of the Confluent Schema Registry for Avro serialization. |
secureSocket | SecureSocket? | () | SSL/TLS configuration for encrypted connections. |
auth | AuthenticationConfiguration? | () | SASL authentication configuration. |
securityProtocol | SecurityProtocol | PROTOCOL_PLAINTEXT | Security protocol ("PLAINTEXT", "SASL_PLAINTEXT", "SASL_SSL", "SSL"). |
additionalProperties | map<string>? | () | Additional Kafka producer properties not covered by named fields. |
Initializing the client
import ballerinax/kafka;
kafka:Producer producer = check new (kafka:DEFAULT_URL, {
clientId: "my-producer",
acks: kafka:ACKS_ALL,
retryCount: 3
});
Operations
Produce messages
send
Sends a message to a Kafka topic.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
producerRecord | AnydataProducerRecord | Yes | The record containing the topic, key, value, and optional headers. |
Returns: error?
Sample code:
check producer->send({
topic: "test-kafka-topic",
key: "order-1".toBytes(),
value: "Hello World, Ballerina".toBytes()
});
sendWithMetadata
Sends a message and returns the record metadata (partition, offset, timestamp).
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
producerRecord | AnydataProducerRecord | Yes | The record containing the topic, key, value, and optional headers. |
Returns: kafka:RecordMetadata|error
Sample code:
kafka:RecordMetadata metadata = check producer->sendWithMetadata({
topic: "test-kafka-topic",
value: "Hello Kafka".toBytes()
});
Sample response:
{"topic": "test-kafka-topic", "partition": 0, "offset": 42, "timestamp": 1700000000000, "serializedKeySize": -1, "serializedValueSize": 11}
offset and timestamp can be () (null) depending on the delivery configuration. Always handle them as nullable (int?) in your code.
flush
Flushes all buffered messages, blocking until all sends are complete.
Parameters:
| Name | Type | Required | Description |
|---|
Returns: error?
Sample code:
check producer->'flush();
Topic metadata
getTopicPartitions
Returns the partition metadata for a given topic.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
topic | string | Yes | The Kafka topic name. |
Returns: kafka:TopicPartition[]|error
Sample code:
kafka:TopicPartition[] partitions = check producer->getTopicPartitions("test-kafka-topic");
Sample response:
[{"topic": "test-kafka-topic", "partition": 0}, {"topic": "test-kafka-topic", "partition": 1}, {"topic": "test-kafka-topic", "partition": 2}]
Lifecycle
close
Closes the producer and releases all resources.
Parameters:
| Name | Type | Required | Description |
|---|
Returns: error?
Sample code:
check producer->close();
Consumer
Subscribes to Kafka topics and polls for messages with manual offset management.
Configuration
| Field | Type | Default | Description |
|---|---|---|---|
groupId | string? | () | Consumer group identifier for coordinated consumption. |
topics | `string | string[]?` | () |
offsetReset | OffsetResetMethod? | () | Strategy when no initial offset exists ("earliest", "latest", "none"). |
keyDeserializerType | DeserializerType | DES_BYTE_ARRAY | Deserializer for message keys. |
valueDeserializerType | DeserializerType | DES_BYTE_ARRAY | Deserializer for message values. |
autoCommit | boolean | true | Automatically commit offsets at regular intervals. |
autoCommitInterval | decimal? | () | Interval in seconds between automatic offset commits. |
sessionTimeout | decimal? | () | Timeout in seconds for detecting consumer failures. |
heartBeatInterval | decimal? | () | Interval in seconds between heartbeats to the consumer coordinator. |
maxPollRecords | int? | () | Maximum number of records returned per poll call. |
isolationLevel | IsolationLevel? | () | Controls how transactional messages are read ("read_committed" or "read_uncommitted"). |
schemaRegistryUrl | string? | () | URL of the Confluent Schema Registry for Avro deserialization. |
pollingTimeout | decimal? | () | Timeout in seconds for each poll call. |
pollingInterval | decimal? | () | Interval in seconds between consecutive polls (used with Listener). |
secureSocket | SecureSocket? | () | SSL/TLS configuration for encrypted connections. |
auth | AuthenticationConfiguration? | () | SASL authentication configuration. |
securityProtocol | SecurityProtocol | PROTOCOL_PLAINTEXT | Security protocol ("PLAINTEXT", "SASL_PLAINTEXT", "SASL_SSL", "SSL"). |
additionalProperties | map<string>? | () | Additional Kafka consumer properties not covered by named fields. |
validation | boolean | true | Enable constraint validation on deserialized records. |
decoupleProcessing | boolean | false | Decouple record processing from polling for improved throughput. |
autoSeekOnValidationFailure | boolean | true | Automatically seek past records that fail data-binding or constraint validation. Set to false to stop and surface the error instead. |
Initializing the client
import ballerinax/kafka;
kafka:Consumer consumer = check new (kafka:DEFAULT_URL, {
groupId: "my-group",
topics: ["test-kafka-topic"],
offsetReset: kafka:OFFSET_RESET_EARLIEST
});
Operations
Subscribe & assign
subscribe
Subscribes the consumer to one or more topics.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
topics | `string | string[]` | Yes |
Returns: error?
groupId must be set in the consumer configuration before calling subscribe(). Calling this method without a groupId causes a panic, not a returned error, and cannot be caught with check.
Sample code:
check consumer->subscribe(["topic-1", "topic-2"]);
subscribeWithPattern
Subscribes to all topics matching a regex pattern.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
regex | string | Yes | Regular expression pattern for topic names. |
Returns: error?
Sample code:
check consumer->subscribeWithPattern("order-.*");
unsubscribe
Unsubscribes from all currently subscribed topics.
Parameters:
| Name | Type | Required | Description |
|---|
Returns: error?
Sample code:
check consumer->unsubscribe();
assign
Manually assigns specific topic-partitions to this consumer.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
partitions | TopicPartition[] | Yes | Array of topic-partition pairs to assign. |
Returns: error?
Sample code:
check consumer->assign([{topic: "test-topic", partition: 0}]);
Poll messages
poll
Polls for consumer records, returning full record metadata including key, value, offset, and headers.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
timeout | decimal | Yes | Maximum time in seconds to block waiting for records. |
T | typedesc<AnydataConsumerRecord[]> | No | Expected consumer record array type for deserialization. |
Returns: T|error
Sample code:
kafka:BytesConsumerRecord[] records = check consumer->poll(1);
foreach var rec in records {
string value = check string:fromBytes(rec.value);
}
Sample response:
[{"key": "order-1", "value": [72, 101, 108, 108, 111], "timestamp": 1700000000000, "offset": {"partition": {"topic": "test-kafka-topic", "partition": 0}, "offset": 42}, "headers": {}}]
pollPayload
Polls for message payloads only, without record metadata.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
timeout | decimal | Yes | Maximum time in seconds to block waiting for records. |
T | typedesc<anydata[]> | No | Expected payload array type for deserialization. |
Returns: T|error
Sample code:
string[] payloads = check consumer->pollPayload(1);
Sample response:
["Hello World, Ballerina", "Order received", "Payment processed"]
Offset management
commit
Commits the current offsets for all subscribed partitions.
Parameters:
| Name | Type | Required | Description |
|---|
Returns: error?
Sample code:
check consumer->'commit();
commitOffset
Commits specific offsets for specific partitions.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
offsets | PartitionOffset[] | Yes | Array of partition-offset pairs to commit. |
duration | decimal | No | Timeout in seconds for the commit operation. Defaults to -1 (uses the consumer's configured default API timeout). |
Returns: error?
Sample code:
check consumer->commitOffset(
[{partition: {topic: "test-topic", partition: 0}, offset: 100}],
10
);
seek
Seeks to a specific offset in a topic-partition.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
offset | PartitionOffset | Yes | The partition and offset to seek to. |
Returns: error?
Sample code:
check consumer->seek({
partition: {topic: "test-topic", partition: 0},
offset: 0
});
seekToBeginning
Seeks to the beginning of the specified partitions.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
partitions | TopicPartition[] | Yes | Partitions to seek to the beginning. |
Returns: error?
Sample code:
check consumer->seekToBeginning([{topic: "test-topic", partition: 0}]);
seekToEnd
Seeks to the end of the specified partitions.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
partitions | TopicPartition[] | Yes | Partitions to seek to the end. |
Returns: error?
Sample code:
check consumer->seekToEnd([{topic: "test-topic", partition: 0}]);
getCommittedOffset
Returns the last committed offset for a partition.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
partition | TopicPartition | Yes | The topic-partition to query. |
duration | decimal | No | Timeout in seconds. Defaults to -1 (uses the consumer's configured default API timeout). |
Returns: kafka:PartitionOffset|error?
Sample code:
kafka:PartitionOffset? offset = check consumer->getCommittedOffset(
{topic: "test-topic", partition: 0}, 10
);
Sample response:
{"partition": {"topic": "test-topic", "partition": 0}, "offset": 99}
getPositionOffset
Returns the current position (next fetch offset) for a partition.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
partition | TopicPartition | Yes | The topic-partition to query. |
duration | decimal | No | Timeout in seconds. Defaults to -1 (uses the consumer's configured default API timeout). |
Returns: int|error
Sample code:
int position = check consumer->getPositionOffset(
{topic: "test-topic", partition: 0}, 10
);
Sample response:
100
Partition management
getAssignment
Returns the set of partitions currently assigned to this consumer.
Parameters:
| Name | Type | Required | Description |
|---|
Returns: kafka:TopicPartition[]|error
Sample code:
kafka:TopicPartition[] assigned = check consumer->getAssignment();
Sample response:
[{"topic": "test-kafka-topic", "partition": 0}, {"topic": "test-kafka-topic", "partition": 1}]
getSubscription
Returns the current topic subscription.
Parameters:
| Name | Type | Required | Description |
|---|
Returns: string[]|error
Sample code:
string[] topics = check consumer->getSubscription();
Sample response:
["test-kafka-topic", "order-topic"]
getTopicPartitions
Returns the partition metadata for a topic.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
topic | string | Yes | The Kafka topic name. |
duration | decimal | No | Timeout in seconds. Defaults to -1 (uses the consumer's configured default API timeout). |
Returns: kafka:TopicPartition[]|error
Sample code:
kafka:TopicPartition[] partitions = check consumer->getTopicPartitions("test-topic", 10);
Sample response:
[{"topic": "test-topic", "partition": 0}, {"topic": "test-topic", "partition": 1}, {"topic": "test-topic", "partition": 2}]
pause
Pauses consumption from the specified partitions.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
partitions | TopicPartition[] | Yes | Partitions to pause. |
Returns: error?
Sample code:
check consumer->pause([{topic: "test-topic", partition: 0}]);
resume
Resumes consumption from previously paused partitions.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
partitions | TopicPartition[] | Yes | Partitions to resume. |
Returns: error?
Sample code:
check consumer->resume([{topic: "test-topic", partition: 0}]);
getPausedPartitions
Returns the set of partitions currently paused.
Parameters:
| Name | Type | Required | Description |
|---|
Returns: kafka:TopicPartition[]|error
Sample code:
kafka:TopicPartition[] paused = check consumer->getPausedPartitions();
Sample response:
[{"topic": "test-topic", "partition": 0}]
getAvailableTopics
Returns all topics available on the Kafka cluster.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
duration | decimal | No | Timeout in seconds. Defaults to -1 (uses the consumer's configured default API timeout). |
Returns: string[]|error
Sample code:
string[] availableTopics = check consumer->getAvailableTopics(10);
Sample response:
["test-kafka-topic", "order-topic", "payment-topic", "__consumer_offsets"]
getBeginningOffsets
Returns the earliest available offsets for the given partitions.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
partitions | TopicPartition[] | Yes | Partitions to query. |
duration | decimal | No | Timeout in seconds. Defaults to -1 (uses the consumer's configured default API timeout). |
Returns: kafka:PartitionOffset[]|error
Sample code:
kafka:PartitionOffset[] beginOffsets = check consumer->getBeginningOffsets(
[{topic: "test-topic", partition: 0}], 10
);
Sample response:
[{"partition": {"topic": "test-topic", "partition": 0}, "offset": 0}]
getEndOffsets
Returns the end offsets (next-to-be-written) for the given partitions.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
partitions | TopicPartition[] | Yes | Partitions to query. |
duration | decimal | No | Timeout in seconds. Defaults to -1 (uses the consumer's configured default API timeout). |
Returns: kafka:PartitionOffset[]|error
Sample code:
kafka:PartitionOffset[] endOffsets = check consumer->getEndOffsets(
[{topic: "test-topic", partition: 0}], 10
);
Sample response:
[{"partition": {"topic": "test-topic", "partition": 0}, "offset": 1523}]
offsetsForTimes
Returns the offsets for the given partitions whose timestamps are greater than or equal to the provided timestamps. Useful for replaying messages from a specific point in time.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
topicPartitionTimestamps | TopicPartitionTimestamp[] | Yes | Array of [TopicPartition, int] tuples where int is the target timestamp in milliseconds since epoch. |
duration | decimal? | No | Timeout in seconds. Defaults to () (uses the consumer's configured default API timeout). |
Returns: kafka:TopicPartitionOffset[]|error
Each element of the returned array is a [TopicPartition, OffsetAndTimestamp?] tuple. The OffsetAndTimestamp value contains offset (int), timestamp (int), and leaderEpoch (int?). The second element can be () if no offset was found for the given timestamp.
Sample code:
kafka:TopicPartitionOffset[] offsets = check consumer->offsetsForTimes(
[[{topic: "test-topic", partition: 0}, 1700000000000]]
);
Sample response:
[[{"topic": "test-topic", "partition": 0}, {"offset": 42, "timestamp": 1700000000005, "leaderEpoch": ()}]]
Lifecycle
close
Closes the consumer and releases all resources.
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
duration | decimal | No | Timeout in seconds to wait for graceful shutdown. Defaults to -1 (uses the consumer's configured default API timeout). |
Returns: error?
Sample code:
check consumer->close(15);