Google Cloud Services - PubSub
This extension allows using Google Cloud PubSub inside your Quarkus application.
Be sure to have read the Google Cloud Services extension pack global documentation before this one, it contains general configuration and information.
Bootstrapping the project
First, we need a new project. Create a new project with the following command (replace the version placeholder with the correct one):
mvn io.quarkus:quarkus-maven-plugin:<quarkusVersion>:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=pubsub-quickstart \
-Dextensions="resteasy-reactive-jackson,quarkus-google-cloud-pubsub"
cd pubsub-quickstart
This command generates a Maven project, importing the Google Cloud PubSub extension.
If you already have your Quarkus project configured, you can add the quarkus-google-cloud-pubsub
extension to your project by running the following command in your project base directory:
./mvnw quarkus:add-extension -Dextensions="quarkus-google-cloud-pubsub"
This will add the following to your pom.xml:
<dependency>
<groupId>io.quarkiverse.googlecloudservices</groupId>
<artifactId>quarkus-google-cloud-pubsub</artifactId>
</dependency>
Preparatory steps
To test PubSub you first need to create a topic named test-topic
You can create one with gcloud
:
gcloud pubsub topics create test-topic
Authentication
This extension provides a QuarkusPubSub
CDI bean that can help to interact with Google PubSub.
QuarkusPubSub
is automatically authenticated, so you don’t have to do anything else to use it.
If you don’t want to use QuarkusPubSub
, be sure to configure the authentication.
By default, PubSub mandates the usage of the GOOGLE_APPLICATION_CREDENTIALS
environment variable to define its credentials, so
you need to set it instead of relying on the quarkus.google.cloud.service-account-location
property.
Another solution, is to inject a CredentialsProvider
provided by the extension, and to use it inside the various PubSub
builders and settings objects when, instantiating PubSub components.
In case you are connecting to the emulator (and thus quarkus.google.cloud.pubsub.emulator-host
is set), the
extension will automatically use the NoCredentialsProvider
for authentication. This behaviour can be disabled
by setting the quarkus.google.cloud.pubsub.use-emulator-credentials
to false
.
Some example
This is an example usage of the extension: we create a REST resource with a single endpoint that sends a message to the test-topic
topic when hit.
We also register a consumer to the same topic at @PostConstruct
time that logs all received messages on the topic so we can check that it works.
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.jboss.logging.Logger;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.SubscriberInterface;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
import io.quarkiverse.googlecloudservices.pubsub.QuarkusPubSub;
@Path("/pubsub")
public class PubSubResource {
private static final Logger LOG = Logger.getLogger(PubSubResource.class);
@Inject
QuarkusPubSub pubSub;
private SubscriberInterface subscriber;
@PostConstruct
void init() throws IOException {
// init topic and subscription
pubSub.createTopic("test-topic");
pubSub.createSubscription("test-topic", "test-subscription");
// Subscribe to PubSub
MessageReceiver receiver = (message, consumer) -> {
LOG.infov("Got message {0}", message.getData().toStringUtf8());
consumer.ack();
};
subscriber = pubSub.subscriber("test-subscription", receiver);
subscriber.startAsync().awaitRunning();
}
@PreDestroy
void destroy() {
// Stop the subscription at destroy time
if (subscriber != null) {
subscriber.stopAsync();
}
}
@GET
@Produces(MediaType.TEXT_PLAIN)
public void pubsub() throws IOException, InterruptedException {
// Init a publisher to the topic
Publisher publisher = pubSub.publisher("test-topic");
try {
ByteString data = ByteString.copyFromUtf8("my-message");// Create a new message
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);// Publish the message
ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() {// Wait for message submission and log the result
public void onSuccess(String messageId) {
LOG.infov("published with message id {0}", messageId);
}
public void onFailure(Throwable t) {
LOG.warnv("failed to publish: {0}", t);
}
}, MoreExecutors.directExecutor());
} finally {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
AdminClient beans
If you need to perform admin actions on PubSub, you can inject the com.google.cloud.pubsub.v1.SubscriptionAdminClient
and
com.google.cloud.pubsub.v1.TopicAdminClient
as CDI beans as shown in the example below. This is usefull if the basic
facilities offered by io.quarkiverse.googlecloudservices.pubsub.QuarkusPubSub
are not sufficient:
@Inject
SubscriptionAdminClient subscriptionAdminClient;
@Inject
TopicAdminClient topicAdminClient;
public someMethod() {
var pushConfig = ...; // Create com.google.pubsub.v1.PushConfig
subscriptionAdminClient.createSubscription("subscription-name", "topic-name", pushConfig, 10 /* ACK deadline */);
var topics = topicAdminClient.listTopics("my-google-project");
}
Pull vs Push messages
PubSub offers two modes of operation; pull and push. The pull model is the regular model where the application regularly checks if there are messages available and pulls them from the broker, afther which they are dispatched internally in the application. This approach works fine for applications that run constantly, but fails when using e.g. Cloud Run, i.e. applications that are started on request. For these situations, a push model can also be used.
In the push model, the application exposes a specific HTTP/REST endpoint which the Google PubSub infrastructure can call to deliver the messages. On Cloud Run, this would imply the service would be started (if needed) to handle these messages.
This extension can be configured to use the push model. In this setup:
-
The application can expose (if configured) an endpoint to respond to the HTTP request from the PubSub infrastructure
-
If you want to set up your own handling, there is a convenience method available to call for the internal handling
-
-
Incoming requests will be verified and checked for the right authentication credentials, and denied if this fails
-
Using
QuarkusPubSub
subscribers can be registered to handle incoming messages. The subscriber methods automatically use the pull of push versions dependening on thequarkus.google.cloud.pubsub.push.enabled
flag.
Note that any message which cannot be successfully delivered to the application and processed will be marked for
redelivery. This includes errors or simply failing to ack()
the message using the AckReplyConsumer
provided
in the MessageReceiver
interface.
When running quarkus-tests, messages will also be authenticated, which will fail as you cannot provide a valid JWT
signed with the Google private keys. To resolve this issue, there are two options:
* Using pull-subscriptions for testing purposes. For the %test
profile, set the quarkus.google.cloud.pubsub.push.enabed
flag to false. This
will result in the code using pull messages instead of push messages without any underlying code changes
* Use a mock implementation of the TokenVerifier
class as can be seen in the PubSubPushResourceTest
in the
integration tests of this extension.
Consult the Google Cloud PubSub push documentation for more information on the push model, configuring the topics and subscriptions and configuring authentication.
Dev Service
Configuring the Dev Service
The extension provides a Dev Service that can be used to run a local PubSub emulator. This is useful for testing purposes, so you don’t have to rely on a real PubSub instance. By default, the Dev Service is disabled, but you can enable it by setting the
-
quarkus.google.cloud.pubsub.devservice.enabled
property totrue
You can also set the
-
quarkus.google.cloud.pubsub.devservice.emulator-port
property to change the port on which the emulator will be started (by default there is no port set, so the emulator will use a random port)
Using the Dev Service
Note: the setup below is handled automatically in case you use QuarkusPubSub
to create publishers and subscribers.
If we want to connect to the Dev Service, we need to specify TransportChannelProvider
when creating subscriptions and publishers.
We can just reuse the code from the previous example and add the TransportChannelProvider
to the Subscriber
and Publisher
. So what do we need to change?
As a first thing, we should declare a variable which we can then reuse and also inject the quarkus.google.cloud.pubsub.emulator-host
property:
@ConfigProperty(name = "quarkus.google.cloud.pubsub.emulator-host")
String emulatorHost;
private TransportChannelProvider channelProvider;
Then, we can create a TransportChannelProvider
that provides connection to devservice within the init
method:
// Create a ChannelProvider that connects to the Dev Service
ManagedChannel channel = ManagedChannelBuilder.forTarget(emulatorHost).usePlaintext().build();
channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
Also in the same method when creating the Subscriber
we set the TransportChannelProvider
:
// Create a subscriber and set the ChannelProvider
subscriber = Subscriber.newBuilder(subscriptionName, receiver).setChannelProvider(channelProvider).build();
subscriber.startAsync().awaitRunning();
The same is done when creating the Publisher
in the pubsub
method:
// Init a publisher to the topic
Publisher publisher = Publisher.newBuilder(topicName)
.setCredentialsProvider(credentialsProvider)
// Set the ChannelProvider
.setChannelProvider(channelProvider)
.build();
And finally we also set the TransportChannelProvider
when creating the SubscriptionAdminClient
in the initSubscription
method:
SubscriptionAdminSettings subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder()
.setCredentialsProvider(credentialsProvider)
// Set the ChannelProvider
.setTransportChannelProvider(channelProvider)
.build();
Configuration Reference
Configuration property fixed at build time - All other configuration properties are overridable at runtime
Configuration property |
Type |
Default |
---|---|---|
Indicates to use the dev service for Firebase. The default value is not setup unless the firebase module is included. In that case, the Firebase devservices will by default be preferred and the DevService for PubSub will be disabled. Environment variable: |
boolean |
|
Indicates whether the Pub/Sub service should be enabled or not. The default value is 'false'. Environment variable: |
boolean |
|
Sets the Docker image name for the Google Cloud SDK. This image is used to emulate the Pub/Sub service in the development environment. The default value is 'gcr.io/google.com/cloudsdktool/google-cloud-cli'. Environment variable: |
string |
|
Specifies the emulatorPort on which the Pub/Sub service should run in the development environment. Environment variable: |
int |
|
Enable push configuration Environment variable: |
boolean |
|
The endpoint path for the pubsub push calls. Environment variable: |
string |
|
Audiences to accept for pubsub push messages. This can be set as a comma-separated list for multiple audiences. If the audience is not configured, the Environment variable: |
string |
|
In case this is set, a query-parameter called "token" is expected to be present in the request with the same value as this configuration option. Calls without this token will be denied. If this property is not set the token will be ignored. Environment variable: |
string |
|
Email adddress of the service account used to send the pub-sub messages. The JWT used for authentication will contain this email address when Google PubSub calls the push-endpoint. Environment variable: |
string |
|
Enable emulator and set its host. Environment variable: |
string |
|
Forces the usage of emulator credentials. The logic automatically uses emulator credentials in case the emulatorHost is set.
Environment variable: |
boolean |
|