Quarkus - Hivemq Client Overview

This extension allows usage of the HiveMQ MQTT Client inside a Quarkus App, in JVM and Native mode.

Together with the "SmallRye Reactive Messaging MQTT" extension allows usage of a new connector type smallrye-mqtt-hivemq that will use HiveMQ MQTT Client instead of Vertx MQTT client.

This adds some benefits to the original SmallRye MQTT:

  • Battle tested MQTT Client outside Vertx landscape

  • Management of external CA file for secure connections with self-signed certificates

  • Backpressure support integrated with MQTT QOS

  • Automatic and configurable reconnection handling and message redelivery

  • Real Health Check against a configurable topic (defaults to the standard MQTT $SYS/broker/uptime) integrated in Quarkus HealthReport

  • Many others you can read in official documentation here.

Matrix compatibility

Quarkus-hivemq-client version Quarkus version HiveMQ client version

1.0.0

2.16.10.Final

1.3.2

Using the HiveMQ-SmallRye Extension

Requirements:

  • Maven 3.8.1+

  • JDK 11+ installed with JAVA_HOME configured appropriately

Create a Quarkus application with the HiveMQ-SmallRye extension

mvn io.quarkus.platform:quarkus-maven-plugin:2.16.10.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=hivemq-quickstart \
    -DclassName="org.acme.quickstart.GreetingResource" \
    -Dpath="/hello" \
    -Dextensions="resteasy-reactive,hivemq,smallrye-reactive-messaging-mqtt"
cd hivemq-quickstart

If you already have your Quarkus project configured, you can add the hivemq extension to your project by running the following command in your project base directory:

./mvnw quarkus:add-extension -Dextensions="hivemq"

This command will add the following dependency to your pom.xml file:

<dependency>
    <groupId>io.quarkiverse.hivemqclient</groupId>
    <artifactId>quarkus-hivemq-client</artifactId>
</dependency>
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-reactive-messaging-mqtt</artifactId>
</dependency>

Then configure your application by adding the HiveMQ connector type:

# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-mqtt-hivemq

# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-mqtt-hivemq

Receiving messages from MQTT

Let’s imagine you have a HiveMQ broker running, and accessible using the localhost:1883 address. Configure your application to receive MQTT messages on the prices channel as follows:

mp.messaging.incoming.prices.connector=smallrye-mqtt-hivemq
mp.messaging.incoming.prices.host=localhost
mp.messaging.incoming.prices.port=1883
mp.messaging.incoming.prices.auto-generated-client-id=true

Then, your application receives Message<byte[]>. You can consumes the payload directly:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MqttPriceConsumer {

    @Incoming("prices")
    public void consume(byte[] raw) {
        double price = Double.parseDouble(new String(raw));

        // process your price.
    }

}

If you want a more detailed example, please take a look the following end-to-end Test

Configuration Reference

Configuration property

Type

Default

Direction

reconnect-attempts

Set the max reconnect attempts.

int

5

INCOMING_AND_OUTGOING

reconnect-interval-seconds

Set the reconnect interval in seconds.

int

1

INCOMING_AND_OUTGOING

ca-cart-file

File containing the self-signed CA for SSL connection.

string

INCOMING_AND_OUTGOING

check-topic-enabled

Enable check for liveness/readiness.

boolean

false

INCOMING_AND_OUTGOING

check-topic-name

Topic Used to check liveness/readiness.

string

$SYS/broker/uptime

INCOMING_AND_OUTGOING

readiness-timeout

Timeout to declare the MQTT Client not ready (in ms).

int

20000

INCOMING_AND_OUTGOING

liveness-timeout

Timeout to declare the MQTT Client not alive.

int

120000

INCOMING_AND_OUTGOING