Quarkus RabbitMQ Client

RabbitMQ is a popular message broker. This Quarkus extension provides a client for RabbitMQ which is configurable using the application.properties.

Installation

If you want to use this extension, you need to add the quarkus-rabbitmq-client extension first. In your pom.xml file, add:

<dependency>
    <groupId>io.quarkiverse.rabbitmqclient</groupId>
    <artifactId>quarkus-rabbitmq-client</artifactId>
    <version>3.2.0</version>
</dependency>

Usage

Assuming you have RabbitMQ running on localhost:5672 you should add the following properties to your application.properties and fill in the values for <username> and <password>.

quarkus.rabbitmqclient.virtual-host=/
quarkus.rabbitmqclient.username=<username>
quarkus.rabbitmqclient.password=<password>
quarkus.rabbitmqclient.hostname=localhost
quarkus.rabbitmqclient.port=5672

Once you have configured the properties, you can start using the RabbitMQ client.

package org.acme.rabbitmqclient;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;

import com.rabbitmq.client.*;
import io.quarkiverse.rabbitmqclient.RabbitMQClient;
import io.quarkus.runtime.StartupEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class MessageService {
    private static final Logger log = LoggerFactory.getLogger(MessageService.class);
    @Inject
    RabbitMQClient rabbitMQClient;
    private Channel channel;
    public void onApplicationStart(@Observes StartupEvent event) {
        // on application start prepare the queus and message listener
        setupQueues();
        setupReceiving();
    }

    private void setupQueues() {
        try {
            // create a connection
            Connection connection = rabbitMQClient.connect();
            // create a channel
            channel = connection.createChannel();
            // declare exchanges and queues
            channel.exchangeDeclare("test", BuiltinExchangeType.TOPIC, true);
            channel.queueDeclare("sample.queue", true, false, false, null);
            channel.queueBind("sample.queue", "test", "#");
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private void setupReceiving() {
        try {
            // register a consumer for messages
            channel.basicConsume("sample.queue", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // just print the received message.
                    log.info("Received: " + new String(body, StandardCharsets.UTF_8));
                }
            });
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public void send(String message) {
        try {
            // send a message to the exchange
            channel.basicPublish("test", "#", null, message.getBytes(StandardCharsets.UTF_8));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}

You do not need to worry about closing connections as the RabbitMQClient will close them for you on application shutdown.

Multiple RabbitMQ Clients

The extension supports having multiple RabbitMQ clients. You can add named RabbitMQ clients as follows.

quarkus.rabbitmqclient.<name>.virtual-host=/
quarkus.rabbitmqclient.<name>.username=<username>
quarkus.rabbitmqclient.<name>.password=<password>
quarkus.rabbitmqclient.<name>.hostname=localhost
quarkus.rabbitmqclient.<name>.port=5672

All configuration options that are available on the default non named RabbitMQ client are available. Injecting a named RabbitMQ client, e.g. foo, can be achieved as follows.

@ApplicationScoped
public class MessageService {

    @Inject
    @NamedRabbitMQClient("foo")
    RabbitMQClient fooClient;
}

It is possible to use multiple RabbitMQ clients in the same class as long as they are all named, or in combination with the default client. The name default is reserved for the default client and if used will trigger a deployment exception.

Disabling Clients

It is possible to disable clients using the quarkus.rabbitmqclient.<client-nane>.enabled=false configuration property. To disable the default client, use quarkus.rabbitmqclient.enabled=false.

Metrics

Both Micrometer and SmallRye Metrics are supported and enabled by default if the quarkus-micrometer or quarkus-smallrye-metrics dependency is included in the project. If both are present micrometer will be used.

Metrics are gathered on a per-client basis and tagged with name=<client-name> of the client. The default clients is tagged with name=default and are all prefixed with rabbitmq.

It is possible to opt-out of metrics by specifying quarkus.rabbitmqclient.metrics.enabled=false. This will disable all metrics gathering.

Configuration Reference

Configuration property fixed at build time - All other configuration properties are overridable at runtime

Configuration property

Type

Default

Disables health check

Environment variable: QUARKUS_RABBITMQCLIENT_HEALTH_ENABLED

boolean

true

Disables metrics

Environment variable: QUARKUS_RABBITMQCLIENT_METRICS_ENABLED

boolean

true

RabbitMQ clients

Type

Default

Disable the client.

Environment variable: QUARKUS_RABBITMQCLIENT_ENABLED

boolean

true

URI for connecting, formatted as amqp://userName:password@hostName:portNumber/virtualHost

Environment variable: QUARKUS_RABBITMQCLIENT_URI

string

Username for authentication

Environment variable: QUARKUS_RABBITMQCLIENT_USERNAME

string

guest

Password for authentication

Environment variable: QUARKUS_RABBITMQCLIENT_PASSWORD

string

guest

Hostname for connecting

Environment variable: QUARKUS_RABBITMQCLIENT_HOSTNAME

string

localhost

Virtual host

Environment variable: QUARKUS_RABBITMQCLIENT_VIRTUAL_HOST

string

/

Port number for connecting

Environment variable: QUARKUS_RABBITMQCLIENT_PORT

int

-1

Connection timeout in milliseconds

Environment variable: QUARKUS_RABBITMQCLIENT_CONNECTION_TIMEOUT

int

60000

Connection close timeout in milliseconds

Environment variable: QUARKUS_RABBITMQCLIENT_CONNECTION_CLOSE_TIMEOUT

int

-1

Heartbeat interval in seconds

Environment variable: QUARKUS_RABBITMQCLIENT_REQUESTED_HEARTBEAT

int

60

Handshake timeout in milliseconds

Environment variable: QUARKUS_RABBITMQCLIENT_HANDSHAKE_TIMEOUT

int

10000

Shutdown timeout in milliseconds

Environment variable: QUARKUS_RABBITMQCLIENT_SHUTDOWN_TIMEOUT

int

10000

Maximum number of channels per connection

Environment variable: QUARKUS_RABBITMQCLIENT_REQUESTED_CHANNEL_MAX

int

2047

Maximum frame size

Environment variable: QUARKUS_RABBITMQCLIENT_REQUESTED_FRAME_MAX

int

0

Maximum body size of inbound (received) messages in bytes.

Default value is 67,108,864 (64 MiB).

Environment variable: QUARKUS_RABBITMQCLIENT_MAX_INBOUND_MESSAGE_BODY_SIZE

int

134217728

Network recovery interval in milliseconds

Environment variable: QUARKUS_RABBITMQCLIENT_NETWORK_RECOVERY_INTERVAL

int

5000

Channel RPC timeout in milliseconds

Environment variable: QUARKUS_RABBITMQCLIENT_CHANNEL_RPC_TIMEOUT

int

600000

Validate channel RPC response type

Environment variable: QUARKUS_RABBITMQCLIENT_CHANNEL_RPC_RESPONSE_TYPE_CHECK

boolean

false

Recover connection on failure

Environment variable: QUARKUS_RABBITMQCLIENT_CONNECTION_RECOVERY

boolean

true

Recover topology on failure

Environment variable: QUARKUS_RABBITMQCLIENT_TOPOLOGY_RECOVERY

boolean

true

SASL authentication mechanisms

Environment variable: QUARKUS_RABBITMQCLIENT_SASL

plain, external

plain

Client properties

Environment variable: QUARKUS_RABBITMQCLIENT_PROPERTIES__PROPERTY_NAME_

String

Broker addresses for creating connections

Type

Default

Hostname for connecting

Environment variable: QUARKUS_RABBITMQCLIENT_ADDRESSES__BROKER_NAME__HOSTNAME

string

required

Port number for connecting

Environment variable: QUARKUS_RABBITMQCLIENT_ADDRESSES__BROKER_NAME__PORT

int

0

Tls configuration

Type

Default

Enables TLS

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_ENABLED

boolean

false

TLS Algorithm to use

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_ALGORITHM

string

TLSv1.2

Trust store file

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_TRUST_STORE_FILE

string

Trust store type

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_TRUST_STORE_TYPE

string

JKS

Trust store algorithm

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_TRUST_STORE_ALGORITHM

string

SunX509

Trust store password

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_TRUST_STORE_PASSWORD

string

Key store file

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_KEY_STORE_FILE

string

Key store password

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_KEY_STORE_PASSWORD

string

Key store type

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_KEY_STORE_TYPE

string

PKCS12

Key store algorithm

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_KEY_STORE_ALGORITHM

string

SunX509

Validate server certificate

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_VALIDATE_SERVER_CERTIFICATE

boolean

true

Verify hostname

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_VERIFY_HOSTNAME

boolean

true

Non-blocking IO configuration

Type

Default

Enables non blocking IO

Environment variable: QUARKUS_RABBITMQCLIENT_NIO_ENABLED

boolean

false

Read buffer size in bytes

Environment variable: QUARKUS_RABBITMQCLIENT_NIO_READ_BYTE_BUFFER_SIZE

int

32768

Write buffer size in bytes

Environment variable: QUARKUS_RABBITMQCLIENT_NIO_WRITE_BYTE_BUFFER_SIZE

int

32768

Number of non blocking IO threads

Environment variable: QUARKUS_RABBITMQCLIENT_NIO_THREADS

int

1

Write enqueuing timeout in milliseconds

Environment variable: QUARKUS_RABBITMQCLIENT_NIO_WRITE_ENQUEUING_TIMEOUT

int

10000

Write queue capacity.

Environment variable: QUARKUS_RABBITMQCLIENT_NIO_WRITE_QUEUE_CAPACITY

int

10000