Google Cloud Services - PubSub

This extension allows using Google Cloud PubSub inside your Quarkus application.

Be sure to have read the Google Cloud Services extension pack global documentation before this one, it contains general configuration and information.

Bootstrapping the project

First, we need a new project. Create a new project with the following command (replace the version placeholder with the correct one):

mvn io.quarkus:quarkus-maven-plugin:<quarkusVersion>:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=pubsub-quickstart \
    -Dextensions="resteasy-reactive-jackson,quarkus-google-cloud-pubsub"
cd pubsub-quickstart

This command generates a Maven project, importing the Google Cloud PubSub extension.

If you already have your Quarkus project configured, you can add the quarkus-google-cloud-pubsub extension to your project by running the following command in your project base directory:

./mvnw quarkus:add-extension -Dextensions="quarkus-google-cloud-pubsub"

This will add the following to your pom.xml:

<dependency>
    <groupId>io.quarkiverse.googlecloudservices</groupId>
    <artifactId>quarkus-google-cloud-pubsub</artifactId>
</dependency>

Preparatory steps

To test PubSub you first need to create a topic named test-topic

You can create one with gcloud:

gcloud pubsub topics create test-topic

Authentication

This extension provides a QuarkusPubSub CDI bean that can help to interact with Google PubSub. QuarkusPubSub is automatically authenticated, so you don’t have to do anything else to use it.

If you don’t want to use QuarkusPubSub, be sure to configure the authentication. By default, PubSub mandates the usage of the GOOGLE_APPLICATION_CREDENTIALS environment variable to define its credentials, so you need to set it instead of relying on the quarkus.google.cloud.service-account-location property. Another solution, is to inject a CredentialsProvider provided by the extension, and to use it inside the various PubSub builders and settings objects when, instantiating PubSub components.

In case you are connecting to the emulator (and thus quarkus.google.cloud.pubsub.emulator-host is set), the extension will automatically use the NoCredentialsProvider for authentication. This behaviour can be disabled by setting the quarkus.google.cloud.pubsub.use-emulator-credentials to false.

Some example

This is an example usage of the extension: we create a REST resource with a single endpoint that sends a message to the test-topic topic when hit.

We also register a consumer to the same topic at @PostConstruct time that logs all received messages on the topic so we can check that it works.

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.jboss.logging.Logger;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.SubscriberInterface;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;

import io.quarkiverse.googlecloudservices.pubsub.QuarkusPubSub;

@Path("/pubsub")
public class PubSubResource {
    private static final Logger LOG = Logger.getLogger(PubSubResource.class);

    @Inject
    QuarkusPubSub pubSub;

    private SubscriberInterface subscriber;

    @PostConstruct
    void init() throws IOException {
        // init topic and subscription
        pubSub.createTopic("test-topic");
        pubSub.createSubscription("test-topic", "test-subscription");

        // Subscribe to PubSub
        MessageReceiver receiver = (message, consumer) -> {
            LOG.infov("Got message {0}", message.getData().toStringUtf8());
            consumer.ack();
        };
        subscriber = pubSub.subscriber("test-subscription", receiver);
        subscriber.startAsync().awaitRunning();
    }

    @PreDestroy
    void destroy() {
        // Stop the subscription at destroy time
        if (subscriber != null) {
            subscriber.stopAsync();
        }
    }

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public void pubsub() throws IOException, InterruptedException {
        // Init a publisher to the topic
        Publisher publisher = pubSub.publisher("test-topic");

        try {
            ByteString data = ByteString.copyFromUtf8("my-message");// Create a new message
            PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
            ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);// Publish the message
            ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() {// Wait for message submission and log the result
                public void onSuccess(String messageId) {
                    LOG.infov("published with message id {0}", messageId);
                }

                public void onFailure(Throwable t) {
                    LOG.warnv("failed to publish: {0}", t);
                }
            }, MoreExecutors.directExecutor());
        } finally {
            publisher.shutdown();
            publisher.awaitTermination(1, TimeUnit.MINUTES);
        }
    }
}

AdminClient beans

If you need to perform admin actions on PubSub, you can inject the com.google.cloud.pubsub.v1.SubscriptionAdminClient and com.google.cloud.pubsub.v1.TopicAdminClient as CDI beans as shown in the example below. This is usefull if the basic facilities offered by io.quarkiverse.googlecloudservices.pubsub.QuarkusPubSub are not sufficient:

    @Inject
    SubscriptionAdminClient subscriptionAdminClient;

    @Inject
    TopicAdminClient topicAdminClient;

    public someMethod() {
        var pushConfig = ...; // Create com.google.pubsub.v1.PushConfig
        subscriptionAdminClient.createSubscription("subscription-name", "topic-name", pushConfig, 10 /* ACK deadline */);

        var topics = topicAdminClient.listTopics("my-google-project");
    }

Pull vs Push messages

PubSub offers two modes of operation; pull and push. The pull model is the regular model where the application regularly checks if there are messages available and pulls them from the broker, afther which they are dispatched internally in the application. This approach works fine for applications that run constantly, but fails when using e.g. Cloud Run, i.e. applications that are started on request. For these situations, a push model can also be used.

In the push model, the application exposes a specific HTTP/REST endpoint which the Google PubSub infrastructure can call to deliver the messages. On Cloud Run, this would imply the service would be started (if needed) to handle these messages.

This extension can be configured to use the push model. In this setup:

  • The application can expose (if configured) an endpoint to respond to the HTTP request from the PubSub infrastructure

    • If you want to set up your own handling, there is a convenience method available to call for the internal handling

  • Incoming requests will be verified and checked for the right authentication credentials, and denied if this fails

  • Using QuarkusPubSub subscribers can be registered to handle incoming messages. The subscriber methods automatically use the pull of push versions dependening on the quarkus.google.cloud.pubsub.push.enabled flag.

Note that any message which cannot be successfully delivered to the application and processed will be marked for redelivery. This includes errors or simply failing to ack() the message using the AckReplyConsumer provided in the MessageReceiver interface.

When running quarkus-tests, messages will also be authenticated, which will fail as you cannot provide a valid JWT signed with the Google private keys. To resolve this issue, there are two options: * Using pull-subscriptions for testing purposes. For the %test profile, set the quarkus.google.cloud.pubsub.push.enabed flag to false. This will result in the code using pull messages instead of push messages without any underlying code changes * Use a mock implementation of the TokenVerifier class as can be seen in the PubSubPushResourceTest in the integration tests of this extension.

Consult the Google Cloud PubSub push documentation for more information on the push model, configuring the topics and subscriptions and configuring authentication.

Dev Service

Configuring the Dev Service

The extension provides a Dev Service that can be used to run a local PubSub emulator. This is useful for testing purposes, so you don’t have to rely on a real PubSub instance. By default, the Dev Service is disabled, but you can enable it by setting the

  • quarkus.google.cloud.pubsub.devservice.enabled property to true

You can also set the

  • quarkus.google.cloud.pubsub.devservice.emulator-port property to change the port on which the emulator will be started (by default there is no port set, so the emulator will use a random port)

Using the Dev Service

Note: the setup below is handled automatically in case you use QuarkusPubSub to create publishers and subscribers.

If we want to connect to the Dev Service, we need to specify TransportChannelProvider when creating subscriptions and publishers.

We can just reuse the code from the previous example and add the TransportChannelProvider to the Subscriber and Publisher. So what do we need to change?

As a first thing, we should declare a variable which we can then reuse and also inject the quarkus.google.cloud.pubsub.emulator-host property:

@ConfigProperty(name = "quarkus.google.cloud.pubsub.emulator-host")
String emulatorHost;

private TransportChannelProvider channelProvider;

Then, we can create a TransportChannelProvider that provides connection to devservice within the init method:

// Create a ChannelProvider that connects to the Dev Service
ManagedChannel channel = ManagedChannelBuilder.forTarget(emulatorHost).usePlaintext().build();
channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));

Also in the same method when creating the Subscriber we set the TransportChannelProvider:

// Create a subscriber and set the ChannelProvider
subscriber = Subscriber.newBuilder(subscriptionName, receiver).setChannelProvider(channelProvider).build();
subscriber.startAsync().awaitRunning();

The same is done when creating the Publisher in the pubsub method:

// Init a publisher to the topic
Publisher publisher = Publisher.newBuilder(topicName)
.setCredentialsProvider(credentialsProvider)
// Set the ChannelProvider
.setChannelProvider(channelProvider)
.build();

And finally we also set the TransportChannelProvider when creating the SubscriptionAdminClient in the initSubscription method:

SubscriptionAdminSettings subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder()
.setCredentialsProvider(credentialsProvider)
// Set the ChannelProvider
.setTransportChannelProvider(channelProvider)
.build();

Configuration Reference

Configuration property fixed at build time - All other configuration properties are overridable at runtime

Configuration property

Type

Default

Indicates to use the dev service for Firebase. The default value is not setup unless the firebase module is included. In that case, the Firebase devservices will by default be preferred and the DevService for PubSub will be disabled.

Environment variable: QUARKUS_GOOGLE_CLOUD_FIREBASE_DEVSERVICE_PREFER_FIREBASE_DEV_SERVICES

boolean

Indicates whether the Pub/Sub service should be enabled or not. The default value is 'false'.

Environment variable: QUARKUS_GOOGLE_CLOUD_PUBSUB_DEVSERVICE_ENABLED

boolean

false

Sets the Docker image name for the Google Cloud SDK. This image is used to emulate the Pub/Sub service in the development environment. The default value is 'gcr.io/google.com/cloudsdktool/google-cloud-cli'.

Environment variable: QUARKUS_GOOGLE_CLOUD_PUBSUB_DEVSERVICE_IMAGE_NAME

string

gcr.io/google.com/cloudsdktool/google-cloud-cli

Specifies the emulatorPort on which the Pub/Sub service should run in the development environment.

Environment variable: QUARKUS_GOOGLE_CLOUD_PUBSUB_DEVSERVICE_EMULATOR_PORT

int

Enable push configuration

Environment variable: QUARKUS_GOOGLE_CLOUD_PUBSUB_PUSH_ENABLED

boolean

false

The endpoint path for the pubsub push calls.

Environment variable: QUARKUS_GOOGLE_CLOUD_PUBSUB_PUSH_ENDPOINT_PATH

string

Audiences to accept for pubsub push messages. This can be set as a comma-separated list for multiple audiences. If the audience is not configured, the aud claim in the Pub sub JWT will be ignored

Environment variable: QUARKUS_GOOGLE_CLOUD_PUBSUB_PUSH_AUDIENCE

string

In case this is set, a query-parameter called "token" is expected to be present in the request with the same value as this configuration option. Calls without this token will be denied. If this property is not set the token will be ignored.

Environment variable: QUARKUS_GOOGLE_CLOUD_PUBSUB_PUSH_VERIFICATION_TOKEN

string

Email adddress of the service account used to send the pub-sub messages. The JWT used for authentication will contain this email address when Google PubSub calls the push-endpoint.

Environment variable: QUARKUS_GOOGLE_CLOUD_PUBSUB_PUSH_SERVICE_ACCOUNT_EMAIL

string

Enable emulator and set its host.

Environment variable: QUARKUS_GOOGLE_CLOUD_PUBSUB_EMULATOR_HOST

string

Forces the usage of emulator credentials. The logic automatically uses emulator credentials in case the emulatorHost is set.

  • If true: force usage of emulator credentials

  • If false: force not using emulator credentials

Environment variable: QUARKUS_GOOGLE_CLOUD_PUBSUB_USE_EMULATOR_CREDENTIALS

boolean

true