Quarkus RabbitMQ Client

RabbitMQ is a popular message broker. This Quarkus extension provides a client for RabbitMQ which is configurable using the application.properties.

Installation

If you want to use this extension, you need to add the quarkus-rabbitmq-client extension first. In your pom.xml file, add:

<dependency>
    <groupId>io.quarkiverse.rabbitmqclient</groupId>
    <artifactId>quarkus-rabbitmq-client</artifactId>
    <version>3.3.0</version>
</dependency>

Migration

When updating to a new version of the extension always make sure you check out the changelog

Usage

Assuming you have RabbitMQ running on localhost:5672 you should add the following properties to your application.properties and fill in the values for <username> and <password>.

quarkus.rabbitmqclient.virtual-host=/
quarkus.rabbitmqclient.username=<username>
quarkus.rabbitmqclient.password=<password>
quarkus.rabbitmqclient.hostname=localhost
quarkus.rabbitmqclient.port=5672

Once you have configured the properties, you can start using the RabbitMQ client.

package org.acme.rabbitmqclient;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;

import com.rabbitmq.client.*;
import io.quarkiverse.rabbitmqclient.RabbitMQClient;
import io.quarkus.runtime.StartupEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class MessageService {
    private static final Logger log = LoggerFactory.getLogger(MessageService.class);
    @Inject
    RabbitMQClient rabbitMQClient;
    private Channel channel;
    public void onApplicationStart(@Observes StartupEvent event) {
        // on application start prepare the queus and message listener
        setupQueues();
        setupReceiving();
    }

    private void setupQueues() {
        try {
            // create a connection
            Connection connection = rabbitMQClient.connect();
            // create a channel
            channel = connection.createChannel();
            // declare exchanges and queues
            channel.exchangeDeclare("test", BuiltinExchangeType.TOPIC, true);
            channel.queueDeclare("sample.queue", true, false, false, null);
            channel.queueBind("sample.queue", "test", "#");
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private void setupReceiving() {
        try {
            // register a consumer for messages
            channel.basicConsume("sample.queue", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // just print the received message.
                    log.info("Received: " + new String(body, StandardCharsets.UTF_8));
                }
            });
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public void send(String message) {
        try {
            // send a message to the exchange
            channel.basicPublish("test", "#", null, message.getBytes(StandardCharsets.UTF_8));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}

You do not need to worry about closing connections as the RabbitMQClient will close them for you on application shutdown.

Multiple RabbitMQ Clients

The extension supports having multiple RabbitMQ clients. You can add named RabbitMQ clients as follows.

quarkus.rabbitmqclient.<name>.virtual-host=/
quarkus.rabbitmqclient.<name>.username=<username>
quarkus.rabbitmqclient.<name>.password=<password>
quarkus.rabbitmqclient.<name>.hostname=localhost
quarkus.rabbitmqclient.<name>.port=5672

All configuration options that are available on the default non named RabbitMQ client are available. Injecting a named RabbitMQ client, e.g. foo, can be achieved as follows.

@ApplicationScoped
public class MessageService {

    @Inject
    @NamedRabbitMQClient("foo")
    RabbitMQClient fooClient;
}

It is possible to use multiple RabbitMQ clients in the same class as long as they are all named, or in combination with the default client. The name default is reserved for the default client and if used will trigger a deployment exception.

Credentials Provider

The extension supports retrieve login/password from Credentials Provider(io.quarkus.credentials.CredentialsProvider) for example Hashicorp Vault. Credentials Provider should be present and configured.

quarkus.rabbitmqclient.<name>.virtual-host=/
quarkus.rabbitmqclient.<name>.hostname=localhost
quarkus.rabbitmqclient.<name>.port=5672
quarkus.rabbitmqclient.credentials-provider=custom

The property quarkus.rabbitmqclient.credentials-provider should contain identifier by which credential provider return login/password.

In case the multiple credentials providers exist For Vault, the credentials provider bean name is vault-credentials-provider from extension https://docs.quarkiverse.io/quarkus-vault/dev/index.html Or use annotation @Named("custom-provider-name").

quarkus.rabbitmqclient.<name>.virtual-host=/
quarkus.rabbitmqclient.<name>.hostname=localhost
quarkus.rabbitmqclient.<name>.port=5672
quarkus.rabbitmqclient.credentials-provider=custom
quarkus.rabbitmqclient.credentials-provider-name=vault-credentials-provider
Login/password can overwrite settings from properties.

If credential is renewable and has "expires-at" will be create com.rabbitmq.client.impl.DefaultCredentialsRefreshService

Disabling Clients

It is possible to disable clients using the quarkus.rabbitmqclient.<client-name>.client-enabled=false configuration property. To disable the default client, use quarkus.rabbitmqclient.client-enabled=false.

Metrics

Both Micrometer and OpenTelemetry Metrics are supported. Metrics are enabled by default. You can opt-out of metrics opt-out by specifying quarkus.rabbitmqclient.metrics.enabled=false. This will disable all metrics gathering.

Metrics are gathered on a per-client basis and tagged with name=<client-id> of the client. The default client is tagged with name=default and all metrics are prefixed with rabbitmq.

If both extension are present, Micrometer will be preferred over OpenTelemetry.

Micrometer Metrics

To enable Micrometer metrics add the quarkus-micrometer extension to the project and make sure it is enabled.

OpenTelemetry Metrics

To enable OpenTelemetry metrics add the quarkus-opentelemetry extension to the project and make sure you enable it, as metrics is not enabled by default. To enable it set:

quarkus.otel.enabled=true
quarkus.otel.metrics.enabled=true

Configuration Reference

Configuration property fixed at build time - All other configuration properties are overridable at runtime

Configuration property

Type

Default

Disables health check

Environment variable: QUARKUS_RABBITMQCLIENT_HEALTH_ENABLED

boolean

true

Disables metrics

Environment variable: QUARKUS_RABBITMQCLIENT_METRICS_ENABLED

boolean

true

RabbitMQ clients

Type

Default

quarkus.rabbitmqclient."client-name".id

A unique RabbitMQ client identifier.

Environment variable: QUARKUS_RABBITMQCLIENT_ID

string

quarkus.rabbitmqclient."client-name".client-enabled

Enables the client.

Environment variable: QUARKUS_RABBITMQCLIENT_CLIENT_ENABLED

boolean

true

quarkus.rabbitmqclient."client-name".uri

URI for connecting, formatted as amqp://userName:password@hostName:portNumber/virtualHost

Environment variable: QUARKUS_RABBITMQCLIENT_URI

string

quarkus.rabbitmqclient."client-name".username

Username for authentication

Environment variable: QUARKUS_RABBITMQCLIENT_USERNAME

string

guest

quarkus.rabbitmqclient."client-name".password

Password for authentication

Environment variable: QUARKUS_RABBITMQCLIENT_PASSWORD

string

guest

quarkus.rabbitmqclient."client-name".credentials-provider

The credentials provider name.

Environment variable: QUARKUS_RABBITMQCLIENT_CREDENTIALS_PROVIDER

string

quarkus.rabbitmqclient."client-name".credentials-provider-name

The credentials provider bean name.

This is a bean name (as in @Named) of a bean that implements CredentialsProvider. It is used to select the credentials provider bean when multiple exist. This is unnecessary when there is only one credentials provider available.

For Vault, the credentials provider bean name is vault-credentials-provider.

Environment variable: QUARKUS_RABBITMQCLIENT_CREDENTIALS_PROVIDER_NAME

string

quarkus.rabbitmqclient."client-name".hostname

Hostname for connecting

Environment variable: QUARKUS_RABBITMQCLIENT_HOSTNAME

string

localhost

quarkus.rabbitmqclient."client-name".virtual-host

Virtual host

Environment variable: QUARKUS_RABBITMQCLIENT_VIRTUAL_HOST

string

/

quarkus.rabbitmqclient."client-name".port

Port number for connecting

Environment variable: QUARKUS_RABBITMQCLIENT_PORT

int

-1

quarkus.rabbitmqclient."client-name".connection-timeout

Connection timeout in milliseconds

Environment variable: QUARKUS_RABBITMQCLIENT_CONNECTION_TIMEOUT

int

60000

quarkus.rabbitmqclient."client-name".connection-close-timeout

Connection close timeout in milliseconds

Environment variable: QUARKUS_RABBITMQCLIENT_CONNECTION_CLOSE_TIMEOUT

int

-1

quarkus.rabbitmqclient."client-name".requested-heartbeat

Heartbeat interval in seconds

Environment variable: QUARKUS_RABBITMQCLIENT_REQUESTED_HEARTBEAT

int

60

quarkus.rabbitmqclient."client-name".handshake-timeout

Handshake timeout in milliseconds

Environment variable: QUARKUS_RABBITMQCLIENT_HANDSHAKE_TIMEOUT

int

10000

quarkus.rabbitmqclient."client-name".shutdown-timeout

Shutdown timeout in milliseconds

Environment variable: QUARKUS_RABBITMQCLIENT_SHUTDOWN_TIMEOUT

int

10000

quarkus.rabbitmqclient."client-name".requested-channel-max

Maximum number of channels per connection

Environment variable: QUARKUS_RABBITMQCLIENT_REQUESTED_CHANNEL_MAX

int

2047

quarkus.rabbitmqclient."client-name".requested-frame-max

Maximum frame size

Environment variable: QUARKUS_RABBITMQCLIENT_REQUESTED_FRAME_MAX

int

0

quarkus.rabbitmqclient."client-name".max-inbound-message-body-size

Maximum body size of inbound (received) messages in bytes.

Default value is 67,108,864 (64 MiB).

Environment variable: QUARKUS_RABBITMQCLIENT_MAX_INBOUND_MESSAGE_BODY_SIZE

int

67108864

quarkus.rabbitmqclient."client-name".network-recovery-interval

Network recovery interval in milliseconds

Environment variable: QUARKUS_RABBITMQCLIENT_NETWORK_RECOVERY_INTERVAL

int

5000

quarkus.rabbitmqclient."client-name".channel-rpc-timeout

Channel RPC timeout in milliseconds

Environment variable: QUARKUS_RABBITMQCLIENT_CHANNEL_RPC_TIMEOUT

int

600000

quarkus.rabbitmqclient."client-name".channel-rpc-response-type-check

Validate channel RPC response type

Environment variable: QUARKUS_RABBITMQCLIENT_CHANNEL_RPC_RESPONSE_TYPE_CHECK

boolean

false

quarkus.rabbitmqclient."client-name".connection-recovery

Recover connection on failure

Environment variable: QUARKUS_RABBITMQCLIENT_CONNECTION_RECOVERY

boolean

true

quarkus.rabbitmqclient."client-name".topology-recovery

Recover topology on failure

Environment variable: QUARKUS_RABBITMQCLIENT_TOPOLOGY_RECOVERY

boolean

true

quarkus.rabbitmqclient."client-name".sasl

SASL authentication mechanisms

Environment variable: QUARKUS_RABBITMQCLIENT_SASL

plain, external

plain

quarkus.rabbitmqclient."client-name".properties."property-name"

Client properties

Environment variable: QUARKUS_RABBITMQCLIENT_PROPERTIES__PROPERTY_NAME_

Map<String,String>

Broker addresses for creating connections

Type

Default

quarkus.rabbitmqclient."client-name".addresses."broker-name".hostname

Hostname for connecting

Environment variable: QUARKUS_RABBITMQCLIENT_ADDRESSES__BROKER_NAME__HOSTNAME

string

required

quarkus.rabbitmqclient."client-name".addresses."broker-name".port

Port number for connecting

Environment variable: QUARKUS_RABBITMQCLIENT_ADDRESSES__BROKER_NAME__PORT

int

0

Tls configuration

Type

Default

quarkus.rabbitmqclient."client-name".tls.enabled

Enables TLS

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_ENABLED

boolean

false

quarkus.rabbitmqclient."client-name".tls.algorithm

TLS Algorithm to use

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_ALGORITHM

string

TLSv1.2

quarkus.rabbitmqclient."client-name".tls.trust-store-file

Trust store file

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_TRUST_STORE_FILE

string

quarkus.rabbitmqclient."client-name".tls.trust-store-type

Trust store type

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_TRUST_STORE_TYPE

string

JKS

quarkus.rabbitmqclient."client-name".tls.trust-store-algorithm

Trust store algorithm

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_TRUST_STORE_ALGORITHM

string

SunX509

quarkus.rabbitmqclient."client-name".tls.trust-store-password

Trust store password

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_TRUST_STORE_PASSWORD

string

quarkus.rabbitmqclient."client-name".tls.key-store-file

Key store file

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_KEY_STORE_FILE

string

quarkus.rabbitmqclient."client-name".tls.key-store-password

Key store password

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_KEY_STORE_PASSWORD

string

quarkus.rabbitmqclient."client-name".tls.key-store-type

Key store type

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_KEY_STORE_TYPE

string

PKCS12

quarkus.rabbitmqclient."client-name".tls.key-store-algorithm

Key store algorithm

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_KEY_STORE_ALGORITHM

string

SunX509

quarkus.rabbitmqclient."client-name".tls.validate-server-certificate

Validate server certificate

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_VALIDATE_SERVER_CERTIFICATE

boolean

true

quarkus.rabbitmqclient."client-name".tls.verify-hostname

Verify hostname

Environment variable: QUARKUS_RABBITMQCLIENT_TLS_VERIFY_HOSTNAME

boolean

true

Non-blocking IO configuration

Type

Default

quarkus.rabbitmqclient."client-name".nio.enabled

Enables non blocking IO

Environment variable: QUARKUS_RABBITMQCLIENT_NIO_ENABLED

boolean

false

quarkus.rabbitmqclient."client-name".nio.read-byte-buffer-size

Read buffer size in bytes

Environment variable: QUARKUS_RABBITMQCLIENT_NIO_READ_BYTE_BUFFER_SIZE

int

32768

quarkus.rabbitmqclient."client-name".nio.write-byte-buffer-size

Write buffer size in bytes

Environment variable: QUARKUS_RABBITMQCLIENT_NIO_WRITE_BYTE_BUFFER_SIZE

int

32768

quarkus.rabbitmqclient."client-name".nio.threads

Number of non blocking IO threads

Environment variable: QUARKUS_RABBITMQCLIENT_NIO_THREADS

int

1

quarkus.rabbitmqclient."client-name".nio.write-enqueuing-timeout

Write enqueuing timeout in milliseconds

Environment variable: QUARKUS_RABBITMQCLIENT_NIO_WRITE_ENQUEUING_TIMEOUT

int

10000

quarkus.rabbitmqclient."client-name".nio.write-queue-capacity

Write queue capacity.

Environment variable: QUARKUS_RABBITMQCLIENT_NIO_WRITE_QUEUE_CAPACITY

int

10000