Google Cloud Services - PubSub
This extension allows using Google Cloud PubSub inside your Quarkus application.
Be sure to have read the Google Cloud Services extension pack global documentation before this one, it contains general configuration and information.
Bootstrapping the project
First, we need a new project. Create a new project with the following command (replace the version placeholder with the correct one):
mvn io.quarkus:quarkus-maven-plugin:<quarkusVersion>:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=pubsub-quickstart \
-Dextensions="resteasy-reactive-jackson,quarkus-google-cloud-pubsub"
cd pubsub-quickstart
This command generates a Maven project, importing the Google Cloud PubSub extension.
If you already have your Quarkus project configured, you can add the quarkus-google-cloud-pubsub
extension to your project by running the following command in your project base directory:
./mvnw quarkus:add-extension -Dextensions="quarkus-google-cloud-pubsub"
This will add the following to your pom.xml:
<dependency>
<groupId>io.quarkiverse.googlecloudservices</groupId>
<artifactId>quarkus-google-cloud-pubsub</artifactId>
</dependency>
Preparatory steps
To test PubSub you first need to create a topic named test-topic
You can create one with gcloud
:
gcloud pubsub topics create test-topic
Authentication
This extension provides a QuarkusPubSub
CDI bean that can help to interact with Google PubSub.
QuarkusPubSub
is automatically authenticated, so you don’t have to do anything else to use it.
If you don’t want to use QuarkusPubSub
, be sure to configure the authentication.
By default, PubSub mandates the usage of the GOOGLE_APPLICATION_CREDENTIALS
environment variable to define its credentials, so
you need to set it instead of relying on the quarkus.google.cloud.service-account-location
property.
Another solution, is to inject a CredentialsProvider
provided by the extension, and to use it inside the various PubSub
builders and settings objects when, instantiating PubSub components.
Some example
This is an example usage of the extension: we create a REST resource with a single endpoint that sends a message to the test-topic
topic when hit.
We also register a consumer to the same topic at @PostConstruct
time that logs all received messages on the topic so we can check that it works.
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.jboss.logging.Logger;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
import io.quarkiverse.googlecloudservices.pubsub.QuarkusPubSub;
@Path("/pubsub")
public class PubSubResource {
private static final Logger LOG = Logger.getLogger(PubSubResource.class);
@Inject
QuarkusPubSub pubSub;
private Subscriber subscriber;
@PostConstruct
void init() throws IOException {
// init topic and subscription
pubSub.createTopic("test-topic");
pubSub.createSubscription("test-topic", "test-subscription");
// Subscribe to PubSub
MessageReceiver receiver = (message, consumer) -> {
LOG.infov("Got message {0}", message.getData().toStringUtf8());
consumer.ack();
};
subscriber = pubSub.subscriber("test-subscription", receiver);
subscriber.startAsync().awaitRunning();
}
@PreDestroy
void destroy() {
// Stop the subscription at destroy time
if (subscriber != null) {
subscriber.stopAsync();
}
}
@GET
@Produces(MediaType.TEXT_PLAIN)
public void pubsub() throws IOException, InterruptedException {
// Init a publisher to the topic
Publisher publisher = pubSub.publisher("test-topic");
try {
ByteString data = ByteString.copyFromUtf8("my-message");// Create a new message
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);// Publish the message
ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() {// Wait for message submission and log the result
public void onSuccess(String messageId) {
LOG.infov("published with message id {0}", messageId);
}
public void onFailure(Throwable t) {
LOG.warnv("failed to publish: {0}", t);
}
}, MoreExecutors.directExecutor());
} finally {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
Dev Service
Configuring the Dev Service
The extension provides a Dev Service that can be used to run a local PubSub emulator. This is useful for testing purposes, so you don’t have to rely on a real PubSub instance. By default, the Dev Service is disabled, but you can enable it by setting the
-
quarkus.google.cloud.pubsub.devservice.enabled
property totrue
You can also set the
-
quarkus.google.cloud.pubsub.devservice.emulator-port
property to change the port on which the emulator will be started (by default there is no port set, so the emulator will use a random port)
Using the Dev Service
If we want to connect to the Dev Service, we need to specify TransportChannelProvider
when creating subscriptions and publishers.
We can just reuse the code from the previous example and add the TransportChannelProvider
to the Subscriber
and Publisher
. So what do we need to change?
As a first thing, we should declare a variable which we can then reuse and also inject the quarkus.google.cloud.pubsub.emulator-host
property:
@ConfigProperty(name = "quarkus.google.cloud.pubsub.emulator-host")
String emulatorHost;
private TransportChannelProvider channelProvider;
Then, we can create a TransportChannelProvider
that provides connection to devservice within the init
method:
// Create a ChannelProvider that connects to the Dev Service
ManagedChannel channel = ManagedChannelBuilder.forTarget(emulatorHost).usePlaintext().build();
channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
Also in the same method when creating the Subscriber
we set the TransportChannelProvider
:
// Create a subscriber and set the ChannelProvider
subscriber = Subscriber.newBuilder(subscriptionName, receiver).setChannelProvider(channelProvider).build();
subscriber.startAsync().awaitRunning();
The same is done when creating the Publisher
in the pubsub
method:
// Init a publisher to the topic
Publisher publisher = Publisher.newBuilder(topicName)
.setCredentialsProvider(credentialsProvider)
// Set the ChannelProvider
.setChannelProvider(channelProvider)
.build();
And finally we also set the TransportChannelProvider
when creating the SubscriptionAdminClient
in the initSubscription
method:
SubscriptionAdminSettings subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder()
.setCredentialsProvider(credentialsProvider)
// Set the ChannelProvider
.setTransportChannelProvider(channelProvider)
.build();
Configuration Reference
Configuration property fixed at build time - All other configuration properties are overridable at runtime
Configuration property |
Type |
Default |
---|---|---|
boolean |
|
|
Sets the Docker image name for the Google Cloud SDK. This image is used to emulate the Pub/Sub service in the development environment. The default value is 'gcr.io/google.com/cloudsdktool/google-cloud-cli'. Environment variable: |
string |
|
int |
||
string |