Quarkus Reactive messaging Nats Jetstream

This extension allow usage of NATS JetStream inside a Quarkus App, in JVM and Native mode.

The extension implements a new connector type quarkus-jetstream in SmallRye Reactive Messaging that will use the NATS client.

Installation

If you want to use this extension, you need to add the io.quarkiverse.reactivemessaging.nats-jetstream:quarkus-reactive-messaging-nats-jetstream extension first to your build file.

For instance, with Maven, add the following dependency to your POM file:

<dependency>
    <groupId>io.quarkiverse.reactivemessaging.nats-jetstream</groupId>
    <artifactId>quarkus-messaging-nats-jetstream</artifactId>
    <version>3.17.14</version>
</dependency>

Then configure your application by adding the NATS JetStream connector type:

# Inbound
mp.messaging.incoming.[channel-name].connector=quarkus-jetstream

# Outbound
mp.messaging.outgoing.[channel-name].connector=quarkus-jetstream

Receiving messages from NATS JetStream

Let’s imagine you have a NATS JetStream broker running, and accessible using the localhost:4242 address. Configure your application to receive NATS messages on the data channel from the stream named: test and the subject named: data as follows:

quarkus.messaging.nats.servers=nats://localhost:4242
quarkus.messaging.nats.username=guest
quarkus.messaging.nats.password=guest
quarkus.messaging.nats.ssl-enabled=false

# Streams and subjects are auto-created by default based on channel configuration
quarkus.messaging.nats.jet-stream.auto-configure=true
quarkus.messaging.nats.jet-stream.streams[0].name=test
quarkus.messaging.nats.jet-stream.streams[0].subjects[0]=data

mp.messaging.incoming.data.connector=quarkus-jetstream
mp.messaging.incoming.data.stream=test
mp.messaging.incoming.data.subject=data

Then, your application receives Message<Data>. You can consumes the payload directly:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class DataConsumer {

    @Incoming("data")
    public void consume(Data data) {
        // process your data.
    }

}

If you want more examples, please take a look at the tests of the extension.

Configuration

Table 1. NATS configuration

quarkus.messaging.nats.servers

A comma-separated list of URI’s nats://{host}:{port} to use for establishing the initial connection to the NATS cluster.

quarkus.messaging.nats.username

The username to connect to the NATS server

quarkus.messaging.nats.password

The password to connect to the NATS server

quarkus.messaging.nats.token

The token to connect to the NATS server

quarkus.messaging.nats.credential-path

The path to the credentials file to connect to the NATS server

quarkus.messaging.nats.ssl-enabled

Whether to enable SSL/TLS secure connections to the NATS server

quarkus.messaging.nats.connection-timeout

The connection timeout in milliseconds

quarkus.messaging.nats.error-listener

The classname for the error listener

quarkus.messaging.nats.buffer-size

The size in bytes to make buffers for connections

quarkus.messaging.nats.keystore-path

The path to the keystore file

quarkus.messaging.nats.keystore-password

The password for the keystore

quarkus.messaging.nats.truststore-path

The path to the trust store file

quarkus.messaging.nats.truststore-password

The password for the trust store

quarkus.messaging.nats.tls-algorithm

The TLS algorithm

quarkus.messaging.nats.connection-attempts

The maximum number of attempts to attempt to re-connect to NATS

quarkus.messaging.nats.connection-backoff

Back-off delay between to attempt to re-connect to NATS

Table 2. NATS JetStream configuration

quarkus.messaging.nats.jet-stream.auto-configure

Autoconfigure stream and subjects based on channel configuration

quarkus.messaging.nats.jet-stream.trace

Enable tracing

quarkus.messaging.nats.jet-stream.streams[i].replicas

The number of replicas a message must be stored. Default value is 1

quarkus.messaging.nats.jet-stream.streams[i].storage-type

The storage type for stream data (File or Memory)

quarkus.messaging.nats.jet-stream.streams[i].retention-policy

Declares the retention policy for the stream (Limits or Interest)

quarkus.messaging.nats.jet-stream.streams[i].name

Name of the stream to setup if auto-configure is enabled

quarkus.messaging.nats.jet-stream.streams[i].subjects[n]

Name of the subject in stream to setup if auto-configure is enabled

quarkus.messaging.nats.jet-stream.streams[i].overwrite

If the stream already exists it will be overwritten

quarkus.messaging.nats.jet-stream.streams[i].description

Description of stream

quarkus.messaging.nats.jet-stream.streams[i].compression-option

The compression option for this stream

quarkus.messaging.nats.jet-stream.streams[i].maximum-consumers

The maximum number of consumers for this stream

quarkus.messaging.nats.jet-stream.streams[i].maximum-messages

The maximum messages for this stream

quarkus.messaging.nats.jet-stream.streams[i].maximum-messages-per-subject

The maximum messages per subject for this stream

quarkus.messaging.nats.jet-stream.streams[i].maximum-bytes

The maximum number of bytes for this stream

quarkus.messaging.nats.jet-stream.streams[i].maximum-age

The maximum message age for this stream

quarkus.messaging.nats.jet-stream.streams[i].maximum-message-size

The maximum message size for this stream

quarkus.messaging.nats.jet-stream.streams[i].template-owner

The template json for this stream

quarkus.messaging.nats.jet-stream.streams[i].discard-policy

The discard policy for this stream

quarkus.messaging.nats.jet-stream.streams[i].duplicate-window

The duplicate checking window stream configuration. Duration. ZERO means duplicate checking is not enabled

quarkus.messaging.nats.jet-stream.streams[i].allow-rollup

The flag indicating if the stream allows rollup

quarkus.messaging.nats.jet-stream.streams[i].allow-direct

The flag indicating if the stream allows direct message access

quarkus.messaging.nats.jet-stream.streams[i].mirror-direct

The flag indicating if the stream allows higher performance and unified direct access for mirrors as well

quarkus.messaging.nats.jet-stream.streams[i].deny-delete

The flag indicating if deny delete is set for the stream

quarkus.messaging.nats.jet-stream.streams[i].deny-purge

The flag indicating if deny purge is set for the stream

quarkus.messaging.nats.jet-stream.streams[i].discard-new-per-subject

Whether discard policy with max message per subject is applied per subject

quarkus.messaging.nats.jet-stream.streams[i].first-sequence

The first sequence used in the stream

quarkus.messaging.nats.jet-stream.key-value-stores[i].bucket-name

Name of Key-Value store

quarkus.messaging.nats.jet-stream.key-value-stores[i].description

Description of Key-Value store

quarkus.messaging.nats.jet-stream.key-value-stores[i].storage-type

The storage type (File or Memory)

quarkus.messaging.nats.jet-stream.key-value-stores[i].max-bucket-size

The maximum number of bytes for this bucket

quarkus.messaging.nats.jet-stream.key-value-stores[i].max-history-per-key

The maximum number of history for any one key. Includes the current value.

quarkus.messaging.nats.jet-stream.key-value-stores[i].max-value-size

The maximum size for an individual value in the bucket

quarkus.messaging.nats.jet-stream.key-value-stores[i].ttl

The maximum age for a value in this bucket

quarkus.messaging.nats.jet-stream.key-value-stores[i].replicas

The number of replicas for this bucket

quarkus.messaging.nats.jet-stream.key-value-stores[i].compressed

Whether to use compression

Channel configuration

Table 3. Subscriber processor attributes

mp.messaging.outgoing.[channel-name].stream

The stream to subscribe messages to

mp.messaging.outgoing.[channel-name].subject

The subject to subscribe messages to

mp.messaging.outgoing.[channel-name].trace-enabled

Enable traces for subscriber

Table 4. Publisher processor attributes

mp.messaging.incoming.[channel-name].name

The name of the NATS consumer

mp.messaging.incoming.[channel-name].stream

The stream to publish messages from

mp.messaging.incoming.[channel-name].subject

The subject to publish messages from

mp.messaging.incoming.[channel-name].publisher-type

The publisher type (Pull, Push)

mp.messaging.incoming.[channel-name].payload-type

The class name of the payload type

mp.messaging.incoming.[channel-name].durable

Sets the durable name for the consumer

mp.messaging.incoming.[channel-name].filter-subjects

A comma separated list of subjects that overlap with the subjects bound to the stream to filter delivery to subscribers

mp.messaging.incoming.[channel-name].ack-wait

The duration that the server will wait for an ack for any individual message once it has been delivered to a consumer. If an ack is not received in time, the message will be redelivered

mp.messaging.incoming.[channel-name].deliver-policy

The point in the stream to receive messages from, either DeliverAll, DeliverLast, DeliverNew, DeliverByStartSequence, DeliverByStartTime, or DeliverLastPerSubject

mp.messaging.incoming.[channel-name].description

A description of the consumer

mp.messaging.incoming.[channel-name].inactive-threshold

Duration that instructs the server to cleanup consumers that are inactive for that long

mp.messaging.incoming.[channel-name].description

A description of the consumer

mp.messaging.incoming.[channel-name].max-ack-pending

Defines the maximum number of messages, without an acknowledgement, that can be outstanding

mp.messaging.incoming.[channel-name].max-deliver

The maximum number of times a specific message delivery will be attempted

mp.messaging.incoming.[channel-name].replay-policy

If the policy is ReplayOriginal, the messages in the stream will be pushed to the client at the same rate that they were originally received, simulating the original timing of messages. If the policy is ReplayInstant (the default), the messages will be pushed to the client as fast as possible while adhering to the Ack Policy, Max Ack Pending and the client’s ability to consume those messages.

mp.messaging.incoming.[channel-name].replicas

Sets the number of replicas for the consumer’s state. By default, when the value is set to zero, consumers inherit the number of replicas from the stream

mp.messaging.incoming.[channel-name].memory-storage

If set, forces the consumer state to be kept in memory rather than inherit the storage type of the stream (file in this case).

mp.messaging.incoming.[channel-name].back-off

The timing of re-deliveries as a comma-separated list of durations

mp.messaging.incoming.[channel-name].retry-backoff

The retry backoff in milliseconds for retry publishing messages

mp.messaging.incoming.[channel-name].pull.batch-size

The size of batch of messages to be pulled in pull mode

mp.messaging.incoming.[channel-name].pull.max-waiting

The maximum number of waiting pull requests

mp.messaging.incoming.[channel-name].pull.max-expires

The maximum duration a single pull request will wait for messages to be available to pull. NATS require this to be 1000ms or above. Default is 3000ms.

mp.messaging.incoming.[channel-name].push.ordered

Flag indicating whether this subscription should be ordered

mp.messaging.incoming.[channel-name].push.deliver-group

The optional deliver group to join

mp.messaging.incoming.[channel-name].push.flow-control

Enables per-subscription flow control using a sliding-window protocol. This protocol relies on the server and client exchanging messages to regulate when and how many messages are pushed to the client. This one-to-one flow control mechanism works in tandem with the one-to-many flow control imposed by MaxAckPending across all subscriptions bound to a consumer.

mp.messaging.incoming.[channel-name].push.deliver-group

The optional deliver group to join

mp.messaging.incoming.[channel-name].push.idle-heart-beat

If the idle heartbeat period is set, the server will regularly send a status message to the client (i.e. when the period has elapsed) while there are no new messages to send. This lets the client know that the JetStream service is still up and running, even when there is no activity on the stream. The message status header will have a code of 100. Unlike FlowControl, it will have no reply to address. It may have a description such \"Idle Heartbeat\". Note that this heartbeat mechanism is all handled transparently by supported clients and does not need to be handled by the application.

mp.messaging.incoming.[channel-name].push.rate-limit

Used to throttle the delivery of messages to the consumer, in bits per second.

mp.messaging.incoming.[channel-name].push.headers-only

Delivers only the headers of messages in the stream and not the bodies. Additionally adds Nats-Msg-Size header to indicate the size of the removed payload.

NATS JetStream

This extension utilizes the NATS JetStream client to connect to a NATS JetStream broker.

Further documentation can be found at:

Reactive Messaging

This extension utilizes SmallRye Reactive Messaging to build data streaming applications.

If you want to go further check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.