Skip to main content

Triggers

The ballerinax/gcloud.pubsub connector supports event-driven message consumption through a streaming pull mechanism. When messages arrive on a Google Cloud Pub/Sub subscription, the listener receives them in real time and invokes your service's onMessage callback automatically.

Three components work together:

ComponentRole
pubsub:ListenerConnects to Google Cloud Pub/Sub and opens streaming pull connections to receive messages from subscriptions.
pubsub:ServiceDefines the onMessage callback invoked for each received message.
pubsub:CallerProvided to the callback to acknowledge (ack) or reject (nack) the received message.
pubsub:MessageThe message payload passed to the onMessage callback, containing data, attributes, and metadata.

For action-based record operations, see the Action Reference.


Listener

The gcloud.pubsub:Listener establishes the connection and manages event subscriptions.

Configuration

The listener supports the following connection strategies:

Config TypeDescription
ListenerConfigurationConfiguration for the Pub/Sub listener. Authenticates using a GCP service account key file.

ListenerConfiguration fields:

FieldTypeDefaultDescription
authGcpCredentialConfig()GCP service account credentials. Contains a path field pointing to the JSON key file.

Initializing the listener

Initialize with service account credentials:

import ballerinax/gcloud.pubsub;

configurable string project = ?;
configurable string gcpCredentialsFilePath = ?;

listener pubsub:Listener pubsubListener = check new (project,
auth = {path: gcpCredentialsFilePath}
);

Service

A pubsub:Service is a Ballerina service attached to a pubsub:Listener. It is annotated with @pubsub:ServiceConfig to bind it to a specific subscription, and implements the onMessage callback to process incoming messages.

Callback signatures

FunctionSignatureDescription
onMessageremote function onMessage(pubsub:Message message, pubsub:Caller caller) returns error?Invoked when a message is received from the subscription. Use the Caller to acknowledge or reject the message.
note

Use caller->ack() to acknowledge successful processing, or caller->nack() to reject the message and request re-delivery.

Full usage example

import ballerina/log;
import ballerinax/gcloud.pubsub;

configurable string project = ?;
configurable string gcpCredentialsFilePath = ?;
configurable string subscription = ?;

listener pubsub:Listener pubsubListener = check new (project,
auth = {path: gcpCredentialsFilePath}
);

@pubsub:ServiceConfig {subscription}
service on pubsubListener {
remote function onMessage(pubsub:Message message, pubsub:Caller caller) returns error? {
log:printInfo("Received message",
messageId = message.messageId,
data = message.data.toString(),
attributes = message.attributes.toString()
);
// Acknowledge the message on successful processing
check caller->ack();
}
}
note

The @pubsub:ServiceConfig annotation supports additional fields for fine-tuning subscriber behavior: maxAckExtensionPeriod, maxDurationPerAckExtension, minDurationPerAckExtension, parallelPullCount, and flowControlSettings.


Supporting types

Message

FieldTypeDescription
messageIdstring?The unique ID assigned by Pub/Sub when the message is published.
dataanydataThe message payload.
attributesmap<string>?Optional key-value attributes attached to the message.
publishTimetime:Utc?The timestamp when the message was published by the server.
orderingKeystring?The ordering key for message ordering, if set by the publisher.

ServiceConfiguration

FieldTypeDescription
subscriptionstringThe name of the Pub/Sub subscription to pull messages from.
maxAckExtensionPerioddecimalMaximum total time (seconds) the ack deadline can be extended. Default: 3600.
maxDurationPerAckExtensiondecimalUpper bound (seconds) for a single ack deadline extension. Default: 600.
minDurationPerAckExtensiondecimalMinimum duration (seconds) for a single ack deadline extension. Default: 0.0.
parallelPullCountintNumber of parallel streaming pull connections. Default: 1.
flowControlSettingsFlowControlConfig?Flow control settings to limit outstanding messages and bytes.

FlowControlConfig

FieldTypeDescription
maxOutstandingMessageCountintMaximum number of outstanding (unacknowledged) messages. Default: 1000.
maxOutstandingRequestBytesintMaximum total bytes of outstanding messages. Default: 104857600 (100 MB).

GcpCredentialConfig

FieldTypeDescription
pathstringFile path to the GCP service account JSON key file.