Skip to main content

Actions

The ballerinax/kafka package exposes the following clients:

ClientPurpose
ProducerPublishes messages to Kafka topics with configurable serialization, compression, and delivery guarantees.
ConsumerSubscribes to Kafka topics and polls for messages with manual offset management.

For event-driven integration, see the Trigger Reference.


Producer

Publishes messages to Kafka topics with configurable serialization, compression, and delivery guarantees.

Configuration

FieldTypeDefaultDescription
acksProducerAcksACKS_SINGLENumber of acknowledgments the producer requires ("all", "0", or "1").
compressionTypeCompressionTypeCOMPRESSION_NONECompression algorithm for messages ("none", "gzip", "snappy", "lz4", "zstd").
clientIdstring?()Identifier sent to the broker for logging and monitoring.
keySerializerTypeSerializerTypeSER_BYTE_ARRAYSerializer for message keys.
valueSerializerTypeSerializerTypeSER_BYTE_ARRAYSerializer for message values.
transactionalIdstring?()Transactional ID for exactly-once delivery.
enableIdempotencebooleanfalseEnable idempotent producer for exactly-once semantics.
retryCountint?()Number of retries for failed send attempts.
batchSizeint?()Maximum number of bytes to batch before sending.
lingerdecimal?()Time in seconds to wait for additional messages before sending a batch.
requestTimeoutdecimal?()Time in seconds to wait for a response from the broker.
schemaRegistryUrlstring?()URL of the Confluent Schema Registry for Avro serialization.
secureSocketSecureSocket?()SSL/TLS configuration for encrypted connections.
authAuthenticationConfiguration?()SASL authentication configuration.
securityProtocolSecurityProtocolPROTOCOL_PLAINTEXTSecurity protocol ("PLAINTEXT", "SASL_PLAINTEXT", "SASL_SSL", "SSL").
additionalPropertiesmap<string>?()Additional Kafka producer properties not covered by named fields.

Initializing the client

import ballerinax/kafka;

kafka:Producer producer = check new (kafka:DEFAULT_URL, {
clientId: "my-producer",
acks: kafka:ACKS_ALL,
retryCount: 3
});

Operations

Produce messages

send

Sends a message to a Kafka topic.

Parameters:

NameTypeRequiredDescription
producerRecordAnydataProducerRecordYesThe record containing the topic, key, value, and optional headers.

Returns: error?

Sample code:

check producer->send({
topic: "test-kafka-topic",
key: "order-1".toBytes(),
value: "Hello World, Ballerina".toBytes()
});
sendWithMetadata

Sends a message and returns the record metadata (partition, offset, timestamp).

Parameters:

NameTypeRequiredDescription
producerRecordAnydataProducerRecordYesThe record containing the topic, key, value, and optional headers.

Returns: kafka:RecordMetadata|error

Sample code:

kafka:RecordMetadata metadata = check producer->sendWithMetadata({
topic: "test-kafka-topic",
value: "Hello Kafka".toBytes()
});

Sample response:

{"topic": "test-kafka-topic", "partition": 0, "offset": 42, "timestamp": 1700000000000, "serializedKeySize": -1, "serializedValueSize": 11}
note

offset and timestamp can be () (null) depending on the delivery configuration. Always handle them as nullable (int?) in your code.

flush

Flushes all buffered messages, blocking until all sends are complete.

Parameters:

NameTypeRequiredDescription

Returns: error?

Sample code:

check producer->'flush();

Topic metadata

getTopicPartitions

Returns the partition metadata for a given topic.

Parameters:

NameTypeRequiredDescription
topicstringYesThe Kafka topic name.

Returns: kafka:TopicPartition[]|error

Sample code:

kafka:TopicPartition[] partitions = check producer->getTopicPartitions("test-kafka-topic");

Sample response:

[{"topic": "test-kafka-topic", "partition": 0}, {"topic": "test-kafka-topic", "partition": 1}, {"topic": "test-kafka-topic", "partition": 2}]

Lifecycle

close

Closes the producer and releases all resources.

Parameters:

NameTypeRequiredDescription

Returns: error?

Sample code:

check producer->close();

Consumer

Subscribes to Kafka topics and polls for messages with manual offset management.

Configuration

FieldTypeDefaultDescription
groupIdstring?()Consumer group identifier for coordinated consumption.
topics`stringstring[]?`()
offsetResetOffsetResetMethod?()Strategy when no initial offset exists ("earliest", "latest", "none").
keyDeserializerTypeDeserializerTypeDES_BYTE_ARRAYDeserializer for message keys.
valueDeserializerTypeDeserializerTypeDES_BYTE_ARRAYDeserializer for message values.
autoCommitbooleantrueAutomatically commit offsets at regular intervals.
autoCommitIntervaldecimal?()Interval in seconds between automatic offset commits.
sessionTimeoutdecimal?()Timeout in seconds for detecting consumer failures.
heartBeatIntervaldecimal?()Interval in seconds between heartbeats to the consumer coordinator.
maxPollRecordsint?()Maximum number of records returned per poll call.
isolationLevelIsolationLevel?()Controls how transactional messages are read ("read_committed" or "read_uncommitted").
schemaRegistryUrlstring?()URL of the Confluent Schema Registry for Avro deserialization.
pollingTimeoutdecimal?()Timeout in seconds for each poll call.
pollingIntervaldecimal?()Interval in seconds between consecutive polls (used with Listener).
secureSocketSecureSocket?()SSL/TLS configuration for encrypted connections.
authAuthenticationConfiguration?()SASL authentication configuration.
securityProtocolSecurityProtocolPROTOCOL_PLAINTEXTSecurity protocol ("PLAINTEXT", "SASL_PLAINTEXT", "SASL_SSL", "SSL").
additionalPropertiesmap<string>?()Additional Kafka consumer properties not covered by named fields.
validationbooleantrueEnable constraint validation on deserialized records.
decoupleProcessingbooleanfalseDecouple record processing from polling for improved throughput.
autoSeekOnValidationFailurebooleantrueAutomatically seek past records that fail data-binding or constraint validation. Set to false to stop and surface the error instead.

Initializing the client

import ballerinax/kafka;

kafka:Consumer consumer = check new (kafka:DEFAULT_URL, {
groupId: "my-group",
topics: ["test-kafka-topic"],
offsetReset: kafka:OFFSET_RESET_EARLIEST
});

Operations

Subscribe & assign

subscribe

Subscribes the consumer to one or more topics.

Parameters:

NameTypeRequiredDescription
topics`stringstring[]`Yes

Returns: error?

note

groupId must be set in the consumer configuration before calling subscribe(). Calling this method without a groupId causes a panic, not a returned error, and cannot be caught with check.

Sample code:

check consumer->subscribe(["topic-1", "topic-2"]);
subscribeWithPattern

Subscribes to all topics matching a regex pattern.

Parameters:

NameTypeRequiredDescription
regexstringYesRegular expression pattern for topic names.

Returns: error?

Sample code:

check consumer->subscribeWithPattern("order-.*");
unsubscribe

Unsubscribes from all currently subscribed topics.

Parameters:

NameTypeRequiredDescription

Returns: error?

Sample code:

check consumer->unsubscribe();
assign

Manually assigns specific topic-partitions to this consumer.

Parameters:

NameTypeRequiredDescription
partitionsTopicPartition[]YesArray of topic-partition pairs to assign.

Returns: error?

Sample code:

check consumer->assign([{topic: "test-topic", partition: 0}]);

Poll messages

poll

Polls for consumer records, returning full record metadata including key, value, offset, and headers.

Parameters:

NameTypeRequiredDescription
timeoutdecimalYesMaximum time in seconds to block waiting for records.
Ttypedesc<AnydataConsumerRecord[]>NoExpected consumer record array type for deserialization.

Returns: T|error

Sample code:

kafka:BytesConsumerRecord[] records = check consumer->poll(1);
foreach var rec in records {
string value = check string:fromBytes(rec.value);
}

Sample response:

[{"key": "order-1", "value": [72, 101, 108, 108, 111], "timestamp": 1700000000000, "offset": {"partition": {"topic": "test-kafka-topic", "partition": 0}, "offset": 42}, "headers": {}}]
pollPayload

Polls for message payloads only, without record metadata.

Parameters:

NameTypeRequiredDescription
timeoutdecimalYesMaximum time in seconds to block waiting for records.
Ttypedesc<anydata[]>NoExpected payload array type for deserialization.

Returns: T|error

Sample code:

string[] payloads = check consumer->pollPayload(1);

Sample response:

["Hello World, Ballerina", "Order received", "Payment processed"]

Offset management

commit

Commits the current offsets for all subscribed partitions.

Parameters:

NameTypeRequiredDescription

Returns: error?

Sample code:

check consumer->'commit();
commitOffset

Commits specific offsets for specific partitions.

Parameters:

NameTypeRequiredDescription
offsetsPartitionOffset[]YesArray of partition-offset pairs to commit.
durationdecimalNoTimeout in seconds for the commit operation. Defaults to -1 (uses the consumer's configured default API timeout).

Returns: error?

Sample code:

check consumer->commitOffset(
[{partition: {topic: "test-topic", partition: 0}, offset: 100}],
10
);
seek

Seeks to a specific offset in a topic-partition.

Parameters:

NameTypeRequiredDescription
offsetPartitionOffsetYesThe partition and offset to seek to.

Returns: error?

Sample code:

check consumer->seek({
partition: {topic: "test-topic", partition: 0},
offset: 0
});
seekToBeginning

Seeks to the beginning of the specified partitions.

Parameters:

NameTypeRequiredDescription
partitionsTopicPartition[]YesPartitions to seek to the beginning.

Returns: error?

Sample code:

check consumer->seekToBeginning([{topic: "test-topic", partition: 0}]);
seekToEnd

Seeks to the end of the specified partitions.

Parameters:

NameTypeRequiredDescription
partitionsTopicPartition[]YesPartitions to seek to the end.

Returns: error?

Sample code:

check consumer->seekToEnd([{topic: "test-topic", partition: 0}]);
getCommittedOffset

Returns the last committed offset for a partition.

Parameters:

NameTypeRequiredDescription
partitionTopicPartitionYesThe topic-partition to query.
durationdecimalNoTimeout in seconds. Defaults to -1 (uses the consumer's configured default API timeout).

Returns: kafka:PartitionOffset|error?

Sample code:

kafka:PartitionOffset? offset = check consumer->getCommittedOffset(
{topic: "test-topic", partition: 0}, 10
);

Sample response:

{"partition": {"topic": "test-topic", "partition": 0}, "offset": 99}
getPositionOffset

Returns the current position (next fetch offset) for a partition.

Parameters:

NameTypeRequiredDescription
partitionTopicPartitionYesThe topic-partition to query.
durationdecimalNoTimeout in seconds. Defaults to -1 (uses the consumer's configured default API timeout).

Returns: int|error

Sample code:

int position = check consumer->getPositionOffset(
{topic: "test-topic", partition: 0}, 10
);

Sample response:

100

Partition management

getAssignment

Returns the set of partitions currently assigned to this consumer.

Parameters:

NameTypeRequiredDescription

Returns: kafka:TopicPartition[]|error

Sample code:

kafka:TopicPartition[] assigned = check consumer->getAssignment();

Sample response:

[{"topic": "test-kafka-topic", "partition": 0}, {"topic": "test-kafka-topic", "partition": 1}]
getSubscription

Returns the current topic subscription.

Parameters:

NameTypeRequiredDescription

Returns: string[]|error

Sample code:

string[] topics = check consumer->getSubscription();

Sample response:

["test-kafka-topic", "order-topic"]
getTopicPartitions

Returns the partition metadata for a topic.

Parameters:

NameTypeRequiredDescription
topicstringYesThe Kafka topic name.
durationdecimalNoTimeout in seconds. Defaults to -1 (uses the consumer's configured default API timeout).

Returns: kafka:TopicPartition[]|error

Sample code:

kafka:TopicPartition[] partitions = check consumer->getTopicPartitions("test-topic", 10);

Sample response:

[{"topic": "test-topic", "partition": 0}, {"topic": "test-topic", "partition": 1}, {"topic": "test-topic", "partition": 2}]
pause

Pauses consumption from the specified partitions.

Parameters:

NameTypeRequiredDescription
partitionsTopicPartition[]YesPartitions to pause.

Returns: error?

Sample code:

check consumer->pause([{topic: "test-topic", partition: 0}]);
resume

Resumes consumption from previously paused partitions.

Parameters:

NameTypeRequiredDescription
partitionsTopicPartition[]YesPartitions to resume.

Returns: error?

Sample code:

check consumer->resume([{topic: "test-topic", partition: 0}]);
getPausedPartitions

Returns the set of partitions currently paused.

Parameters:

NameTypeRequiredDescription

Returns: kafka:TopicPartition[]|error

Sample code:

kafka:TopicPartition[] paused = check consumer->getPausedPartitions();

Sample response:

[{"topic": "test-topic", "partition": 0}]
getAvailableTopics

Returns all topics available on the Kafka cluster.

Parameters:

NameTypeRequiredDescription
durationdecimalNoTimeout in seconds. Defaults to -1 (uses the consumer's configured default API timeout).

Returns: string[]|error

Sample code:

string[] availableTopics = check consumer->getAvailableTopics(10);

Sample response:

["test-kafka-topic", "order-topic", "payment-topic", "__consumer_offsets"]
getBeginningOffsets

Returns the earliest available offsets for the given partitions.

Parameters:

NameTypeRequiredDescription
partitionsTopicPartition[]YesPartitions to query.
durationdecimalNoTimeout in seconds. Defaults to -1 (uses the consumer's configured default API timeout).

Returns: kafka:PartitionOffset[]|error

Sample code:

kafka:PartitionOffset[] beginOffsets = check consumer->getBeginningOffsets(
[{topic: "test-topic", partition: 0}], 10
);

Sample response:

[{"partition": {"topic": "test-topic", "partition": 0}, "offset": 0}]
getEndOffsets

Returns the end offsets (next-to-be-written) for the given partitions.

Parameters:

NameTypeRequiredDescription
partitionsTopicPartition[]YesPartitions to query.
durationdecimalNoTimeout in seconds. Defaults to -1 (uses the consumer's configured default API timeout).

Returns: kafka:PartitionOffset[]|error

Sample code:

kafka:PartitionOffset[] endOffsets = check consumer->getEndOffsets(
[{topic: "test-topic", partition: 0}], 10
);

Sample response:

[{"partition": {"topic": "test-topic", "partition": 0}, "offset": 1523}]
offsetsForTimes

Returns the offsets for the given partitions whose timestamps are greater than or equal to the provided timestamps. Useful for replaying messages from a specific point in time.

Parameters:

NameTypeRequiredDescription
topicPartitionTimestampsTopicPartitionTimestamp[]YesArray of [TopicPartition, int] tuples where int is the target timestamp in milliseconds since epoch.
durationdecimal?NoTimeout in seconds. Defaults to () (uses the consumer's configured default API timeout).

Returns: kafka:TopicPartitionOffset[]|error

Each element of the returned array is a [TopicPartition, OffsetAndTimestamp?] tuple. The OffsetAndTimestamp value contains offset (int), timestamp (int), and leaderEpoch (int?). The second element can be () if no offset was found for the given timestamp.

Sample code:

kafka:TopicPartitionOffset[] offsets = check consumer->offsetsForTimes(
[[{topic: "test-topic", partition: 0}, 1700000000000]]
);

Sample response:

[[{"topic": "test-topic", "partition": 0}, {"offset": 42, "timestamp": 1700000000005, "leaderEpoch": ()}]]

Lifecycle

close

Closes the consumer and releases all resources.

Parameters:

NameTypeRequiredDescription
durationdecimalNoTimeout in seconds to wait for graceful shutdown. Defaults to -1 (uses the consumer's configured default API timeout).

Returns: error?

Sample code:

check consumer->close(15);