Quarkus Reactive Messsaging Nats Jetstream

This extension allow usage of NATS JetStream inside a Quarkus App, in JVM and Native mode.

The extension implements a new connector type quarkus-jetstream in SmallRye Reactive Messaging that will use the NATS client.

Installation

If you want to use this extension, you need to add the io.quarkiverse.reactivemessaging.nats-jetstream:quarkus-reactive-messsaging-nats-jetstream extension first to your build file.

For instance, with Maven, add the following dependency to your POM file:

<dependency>
    <groupId>io.quarkiverse.reactivemessaging.nats-jetstream</groupId>
    <artifactId>quarkus-reactive-messsaging-nats-jetstream</artifactId>
    <version>1.3.3</version>
</dependency>

Then configure your application by adding the NATS JetStream connector type:

# Inbound
mp.messaging.incoming.[channel-name].connector=quarkus-jetstream

# Outbound
mp.messaging.outgoing.[channel-name].connector=quarkus-jetstream

Receiving messages from NATS JetStream

Let’s imagine you have a NATS JetStream broker running, and accessible using the localhost:4242 address. Configure your application to receive NATS messages on the data channel from the stream named: test and the subject named: data as follows:

quarkus.reactive-messaging.nats.servers=nats://localhost:4242
quarkus.reactive-messaging.nats.username=guest
quarkus.reactive-messaging.nats.password=guest
quarkus.reactive-messaging.nats.ssl-enabled=false

# Streams and subjects are auto-created by default based on channel configuration
quarkus.reactive-messaging.nats.jet-stream.auto-configure=true

mp.messaging.incoming.data.connector=quarkus-jetstream
mp.messaging.incoming.data.stream=test
mp.messaging.incoming.data.subject=data

Then, your application receives Message<Data>. You can consumes the payload directly:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class DataConsumer {

    @Incoming("data")
    public void consume(Data data) {
        // process your data.
    }

}

If you want more examples, please take a look at the tests of the extension.

Configuration

Table 1. NATS configuration

quarkus.reactive-messaging.nats.servers

A comma-separated list of URI’s nats://{host}:{port} to use for establishing the initial connection to the NATS cluster.

quarkus.reactive-messaging.nats.username

The username to connect to the NATS server

quarkus.reactive-messaging.nats.password

The password to connect to the NATS server

quarkus.reactive-messaging.nats.ssl-enabled

Whether to enable SSL/TLS secure connections to the NATS server

quarkus.reactive-messaging.nats.max-reconnects

The maximum number of reconnect attempts

quarkus.reactive-messaging.nats.connection-timeout

The connection timeout in milliseconds

quarkus.reactive-messaging.nats.error-listener

The classname for the error listener

quarkus.reactive-messaging.nats.buffer-size

The size in bytes to make buffers for connections

Table 2. NATS JetStream configuration

quarkus.reactive-messaging.nats.jet-stream.auto-configure

Autoconfigure stream and subjects based on channel configuration

quarkus.reactive-messaging.nats.jet-stream.replicas

The number of replicas a message must be stored. Default value is 1

quarkus.reactive-messaging.nats.jet-stream.storage-type

The storage type for stream data (File or Memory)

quarkus.reactive-messaging.nats.jet-stream.retention-policy

Declares the retention policy for the stream (Limits or Interest)

Channel configuration

Table 3. Subscriber processor attributes

mp.messaging.outgoing.[channel-name].stream

The stream to subscribe messages to

mp.messaging.outgoing.[channel-name].subject

The subject to subscribe messages to

mp.messaging.outgoing.[channel-name].trace-enabled

Enable traces for subscriber

mp.messaging.outgoing.[channel-name].auto-configure

Auto configure subject on NATS

Table 4. Publisher processor attributes

mp.messaging.incoming.[channel-name].stream

The stream to publish messages from

mp.messaging.incoming.[channel-name].subject

The subject to publish messages from

mp.messaging.incoming.[channel-name].trace-enabled

Enable traces for publisher

mp.messaging.incoming.[channel-name].auto-configure

Auto configure subject on NATS

mp.messaging.incoming.[channel-name].ordered

Flag indicating whether this subscription should be ordered

mp.messaging.incoming.[channel-name].deliver-group

The optional deliver group to join

mp.messaging.incoming.[channel-name].durable

Sets the durable name for the consumer

mp.messaging.incoming.[channel-name].max-deliver

The maximum number of times a specific message delivery will be attempted

mp.messaging.incoming.[channel-name].back-off

The timing of re-deliveries as a comma-separated list of durations

mp.messaging.incoming.[channel-name].payload-type

The class name of the payload type

mp.messaging.incoming.[channel-name].pull

The subscription type

mp.messaging.incoming.[channel-name].pull.batch-size

The size of batch of messages to be pulled in pull mode

mp.messaging.incoming.[channel-name].pull.repull-at

The point in the current batch to tell the server to start the next batch

mp.messaging.incoming.[channel-name].pull.poll-timeout

The poll timeout in milliseconds

use 0 to wait indefinitely

mp.messaging.incoming.[channel-name].retry-backoff

NATS JetStream

This extension utilizes the NATS JetStream client to connect to a NATS JetStream broker.

Further documentation can be found at:

Reactive Messaging

This extension utilizes SmallRye Reactive Messaging to build data streaming applications.

If you want to go further check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.