Quarkus RabbitMQ Client
RabbitMQ is a popular message broker. This Quarkus extension provides a client for RabbitMQ which is configurable using the application.properties
.
Installation
If you want to use this extension, you need to add the quarkus-rabbitmq-client
extension first.
In your pom.xml
file, add:
<dependency>
<groupId>io.quarkiverse.rabbitmqclient</groupId>
<artifactId>quarkus-rabbitmq-client</artifactId>
<version>3.2.0</version>
</dependency>
Usage
Assuming you have RabbitMQ running on localhost:5672 you should add the following properties to your application.properties
and fill in the values for <username>
and <password>
.
quarkus.rabbitmqclient.virtual-host=/
quarkus.rabbitmqclient.username=<username>
quarkus.rabbitmqclient.password=<password>
quarkus.rabbitmqclient.hostname=localhost
quarkus.rabbitmqclient.port=5672
Once you have configured the properties, you can start using the RabbitMQ client.
package org.acme.rabbitmqclient;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import com.rabbitmq.client.*;
import io.quarkiverse.rabbitmqclient.RabbitMQClient;
import io.quarkus.runtime.StartupEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ApplicationScoped
public class MessageService {
private static final Logger log = LoggerFactory.getLogger(MessageService.class);
@Inject
RabbitMQClient rabbitMQClient;
private Channel channel;
public void onApplicationStart(@Observes StartupEvent event) {
// on application start prepare the queus and message listener
setupQueues();
setupReceiving();
}
private void setupQueues() {
try {
// create a connection
Connection connection = rabbitMQClient.connect();
// create a channel
channel = connection.createChannel();
// declare exchanges and queues
channel.exchangeDeclare("test", BuiltinExchangeType.TOPIC, true);
channel.queueDeclare("sample.queue", true, false, false, null);
channel.queueBind("sample.queue", "test", "#");
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private void setupReceiving() {
try {
// register a consumer for messages
channel.basicConsume("sample.queue", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// just print the received message.
log.info("Received: " + new String(body, StandardCharsets.UTF_8));
}
});
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public void send(String message) {
try {
// send a message to the exchange
channel.basicPublish("test", "#", null, message.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
You do not need to worry about closing connections as the RabbitMQClient
will close them for you on application shutdown.
Multiple RabbitMQ Clients
The extension supports having multiple RabbitMQ clients. You can add named RabbitMQ clients as follows.
quarkus.rabbitmqclient.<name>.virtual-host=/
quarkus.rabbitmqclient.<name>.username=<username>
quarkus.rabbitmqclient.<name>.password=<password>
quarkus.rabbitmqclient.<name>.hostname=localhost
quarkus.rabbitmqclient.<name>.port=5672
All configuration options that are available on the default non named RabbitMQ client are available. Injecting a named RabbitMQ client, e.g. foo, can be achieved as follows.
@ApplicationScoped
public class MessageService {
@Inject
@NamedRabbitMQClient("foo")
RabbitMQClient fooClient;
}
It is possible to use multiple RabbitMQ clients in the same class as long as they are all named, or in combination with the default client. The name default
is reserved for the default client and if used will trigger a deployment exception.
Metrics
Both Micrometer and SmallRye Metrics are supported and enabled by default if the quarkus-micrometer
or quarkus-smallrye-metrics
dependency is included in the project. If both are present micrometer will be used.
Metrics are gathered on a per-client basis and tagged with name=<client-name>
of the client. The default clients is tagged with name=default
and are all prefixed with rabbitmq
.
It is possible to opt-out of metrics by specifying quarkus.rabbitmqclient.metrics.enabled=false
. This will disable all metrics gathering.
Configuration Reference
Configuration property fixed at build time - All other configuration properties are overridable at runtime
Type |
Default |
|
---|---|---|
Disables health check Environment variable: |
boolean |
|
Disables metrics Environment variable: |
boolean |
|
Type |
Default |
|
Disable the client. Environment variable: |
boolean |
|
URI for connecting, formatted as amqp://userName:password@hostName:portNumber/virtualHost Environment variable: |
string |
|
Username for authentication Environment variable: |
string |
|
Password for authentication Environment variable: |
string |
|
Hostname for connecting Environment variable: |
string |
|
Virtual host Environment variable: |
string |
|
Port number for connecting Environment variable: |
int |
|
Connection timeout in milliseconds Environment variable: |
int |
|
Connection close timeout in milliseconds Environment variable: |
int |
|
Heartbeat interval in seconds Environment variable: |
int |
|
Handshake timeout in milliseconds Environment variable: |
int |
|
Shutdown timeout in milliseconds Environment variable: |
int |
|
Maximum number of channels per connection Environment variable: |
int |
|
Maximum frame size Environment variable: |
int |
|
Maximum body size of inbound (received) messages in bytes. Default value is 67,108,864 (64 MiB). Environment variable: |
int |
|
Network recovery interval in milliseconds Environment variable: |
int |
|
Channel RPC timeout in milliseconds Environment variable: |
int |
|
Validate channel RPC response type Environment variable: |
boolean |
|
Recover connection on failure Environment variable: |
boolean |
|
Recover topology on failure Environment variable: |
boolean |
|
SASL authentication mechanisms Environment variable: |
|
|
Client properties Environment variable: |
||
Type |
Default |
|
Hostname for connecting Environment variable: |
string |
required |
Port number for connecting Environment variable: |
int |
|
Type |
Default |
|
Enables TLS Environment variable: |
boolean |
|
TLS Algorithm to use Environment variable: |
string |
|
Trust store file Environment variable: |
string |
|
Trust store type Environment variable: |
string |
|
Trust store algorithm Environment variable: |
string |
|
Trust store password Environment variable: |
string |
|
Key store file Environment variable: |
string |
|
Key store password Environment variable: |
string |
|
Key store type Environment variable: |
string |
|
Key store algorithm Environment variable: |
string |
|
Validate server certificate Environment variable: |
boolean |
|
Verify hostname Environment variable: |
boolean |
|
Type |
Default |
|
Enables non blocking IO Environment variable: |
boolean |
|
Read buffer size in bytes Environment variable: |
int |
|
Write buffer size in bytes Environment variable: |
int |
|
Number of non blocking IO threads Environment variable: |
int |
|
Write enqueuing timeout in milliseconds Environment variable: |
int |
|
Write queue capacity. Environment variable: |
int |
|