Quarkus Reactive Messaging Nats Jetstream

This extension allow usage of NATS JetStream inside a Quarkus App, in JVM and Native mode.

The extension implements a new connector type quarkus-jetstream in SmallRye Reactive Messaging that will use the NATS client.

Installation

If you want to use this extension, you need to add the io.quarkiverse.reactivemessaging.nats-jetstream:quarkus-reactive-messaging-nats-jetstream extension first to your build file.

For instance, with Maven, add the following dependency to your POM file:

<dependency>
    <groupId>io.quarkiverse.reactivemessaging.nats-jetstream</groupId>
    <artifactId>quarkus-messaging-nats-jetstream</artifactId>
    <version>3.25.0</version>
</dependency>

Then configure your application by adding the NATS JetStream connector type:

# Inbound
mp.messaging.incoming.[channel-name].connector=quarkus-jetstream

# Outbound
mp.messaging.outgoing.[channel-name].connector=quarkus-jetstream

Receiving messages from NATS JetStream

Let’s imagine you have a NATS JetStream broker running, and accessible using the localhost:4242 address. Configure your application to receive NATS messages on the data channel from the stream named: test and the subject named: data as follows:

quarkus.messaging.nats.connection.servers=nats://localhost:4242
quarkus.messaging.nats.connection.username=guest
quarkus.messaging.nats.connection.password=guest
quarkus.messaging.nats.connection.ssl-enabled=false

# Streams and subjects are auto-created by default based on channel configuration
quarkus.messaging.nats.auto-configure=true
quarkus.messaging.nats.streams.test.pull-consumers.data-consumer.consumer-configuration.filter-subjects=data

mp.messaging.incoming.data.connector=quarkus-jetstream
mp.messaging.incoming.data.stream=test
mp.messaging.incoming.data.consumer=data-consumer

Then, your application receives Message<Data>. You can consumes the payload directly:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class DataConsumer {

    @Incoming("data")
    public void consume(Data data) {
        // process your data.
    }

}

If you want more examples, please take a look at the tests of the extension.

Configuration

Table 1. NATS connection configuration
Property Name Description Type Default Value

quarkus.messaging.nats.connection.servers

A comma-separated list of URI’s nats://{host}:{port} to use for establishing the initial connection to the NATS cluster.

String

nats://localhost:4222

quarkus.messaging.nats.connection.username

The username to connect to the NATS server

String

quarkus.messaging.nats.connection.password

The password to connect to the NATS server

String

quarkus.messaging.nats.connection.token

The token to connect to the NATS server

String

quarkus.messaging.nats.connection.credential-path

The path to the credentials file to connect to the NATS server

String

quarkus.messaging.nats.connection.ssl-enabled

Whether to enable SSL/TLS secure connections to the NATS server

Boolean

false

quarkus.messaging.nats.connection.connection-timeout

The connection timeout

Duration

quarkus.messaging.nats.connection.error-listener

The classname for the error listener

String

quarkus.messaging.nats.connection.buffer-size

The size in bytes to make buffers for connections

Integer

quarkus.messaging.nats.connection.tls-algorithm

The TLS algorithm

String

quarkus.messaging.nats.connection.connection-attempts

The maximum number of attempts to attempt to re-connect to NATS

Integer

-1 (unlimited)

quarkus.messaging.nats.connection.connection-backoff

Back-off delay between to attempt to re-connect to NATS

Duration

1s

quarkus.messaging.nats.connection.tls-configuration-name

The name of the TLS configuration (bucket) used for client authentication in the TLS registry

String

Table 2. NATS JetStream configuration
Property Name Description Type Default Value

quarkus.messaging.nats.auto-configure

Autoconfigure stream and subjects based on channel configuration

Boolean

true

quarkus.messaging.nats.trace

Enable tracing

Boolean

true

quarkus.messaging.nats.streams.[stream-name].replicas

The number of replicas a message must be stored.

Integer

1

quarkus.messaging.nats.streams.[stream-name].storage-type

The storage type for stream data

File or Memory

File

quarkus.messaging.nats.streams.[stream-name].retention-policy, Declares the retention policy for the stream

Limits or Interest

Interest

quarkus.messaging.nats.streams.[stream-name].subjects

A comma separated list of the subject names in stream to setup if auto-configure is enabled

String

quarkus.messaging.nats.streams.[stream-name].description

Description of stream

String

quarkus.messaging.nats.streams.[stream-name].compression-option

The compression option for this stream

None or S2

None

quarkus.messaging.nats.streams.[stream-name].maximum-consumers

The maximum number of consumers for this stream

Long

quarkus.messaging.nats.streams.[stream-name].maximum-messages

The maximum messages for this stream

Long

quarkus.messaging.nats.streams.[stream-name].maximum-messages-per-subject

The maximum messages per subject for this stream

Long

quarkus.messaging.nats.streams.[stream-name].maximum-bytes

The maximum number of bytes for this stream

Long

quarkus.messaging.nats.streams.[stream-name].maximum-age

The maximum message age for this stream

Duration

quarkus.messaging.nats.streams.[stream-name].maximum-message-size

The maximum message size for this stream

Integer

quarkus.messaging.nats.streams.[stream-name].template-owner

The template json for this stream

String

quarkus.messaging.nats.streams.[stream-name].discard-policy

The discard policy for this stream

New or Old

quarkus.messaging.nats.streams.[stream-name].duplicate-window

The duplicate checking window stream configuration. Duration. ZERO means duplicate checking is not enabled

Duration

quarkus.messaging.nats.streams.[stream-name].allow-rollup

The flag indicating if the stream allows rollup

Boolean

quarkus.messaging.nats.streams.[stream-name].allow-direct

The flag indicating if the stream allows direct message access

Boolean

quarkus.messaging.nats.streams.[stream-name].mirror-direct

The flag indicating if the stream allows higher performance and unified direct access for mirrors as well

Boolean

quarkus.messaging.nats.streams.[stream-name].deny-delete

The flag indicating if deny delete is set for the stream

Boolean

quarkus.messaging.nats.streams.[stream-name].deny-purge

The flag indicating if deny purge is set for the stream

Boolean

quarkus.messaging.nats.streams.[stream-name].discard-new-per-subject

Whether discard policy with max message per subject is applied per subject

Boolean

quarkus.messaging.nats.streams.[stream-name].first-sequence

The first sequence used in the stream

Long

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.description

Description of pull consumer

String

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.payload-type

The class name of the payload type

String

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.durable

Set to true if the consumer should be durable

Boolean

false

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.filter-subjects

A comma separated list of subjects that overlap with the subjects bound to the stream to filter delivery to subscribers

String

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.ack-wait

The duration that the server will wait for an ack for any individual message once it has been delivered to a consumer. If an ack is not received in time, the message will be redelivered

Duration

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.deliver-policy

The point in the stream to receive messages from

All, Last, New, ByStartSequence, ByStartTime or LastPerSubject

All

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.inactive-threshold

Duration that instructs the server to cleanup consumers that are inactive for that long

Duration

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.max-ack-pending

Defines the maximum number of messages, without an acknowledgement, that can be outstanding

Long

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.max-deliver

The maximum number of times a specific message delivery will be attempted

Long

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.replay-policy

If the policy is Original, the messages in the stream will be pushed to the client at the same rate that they were originally received, simulating the original timing of messages. If the policy is Instant (the default), the messages will be pushed to the client as fast as possible while adhering to the Ack Policy, Max Ack Pending and the client’s ability to consume those messages.

Instant or Original

Instant

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.replicas

Sets the number of replicas for the consumer’s state. By default, when the value is set to zero, consumers inherit the number of replicas from the stream

Integer

0

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.memory-storage

If set, forces the consumer state to be kept in memory rather than inherit the storage type of the stream (file in this case).

Boolean

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.back-off

The timing of re-deliveries as a comma-separated list of durations

Duration

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.acknowledge-timeout

The duration in milliseconds to wait for an ack confirmation

Duration

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.start-sequence

The start sequence

Long

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.start-time

The start time

ZonedDateTime

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.sample-frequency

The sample frequency

String

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.meta-data

The consumer metadata

Map<String, String>

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].consumer-configuration.pause-until

The pause until

ZonedDateTime

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].pull-configuration.pull.batch-size

The size of batch of messages to be pulled in pull mode

Integer

10

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].pull-configuration.max-waiting

The maximum number of waiting pull requests

Integer

quarkus.messaging.nats.streams.[stream-name].pull-consumers.[consumer-name].pull-configuration.max-expires

The maximum duration a single pull request will wait for messages to be available to pull. NATS require this to be 1000ms or above

Duration

1s

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.description

Description of pull consumer

String

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.payload-type

The class name of the payload type

String

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.durable

Sets the durable for the consumer

Boolean

false

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.filter-subjects

A comma separated list of subjects that overlap with the subjects bound to the stream to filter delivery to subscribers

String

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.ack-wait

The duration that the server will wait for an ack for any individual message once it has been delivered to a consumer. If an ack is not received in time, the message will be redelivered

Duration

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.deliver-policy

The point in the stream to receive messages from

All, Last, New, ByStartSequence, ByStartTime or LastPerSubject

All

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.inactive-threshold

Duration that instructs the server to cleanup consumers that are inactive for that long

Duration

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.max-ack-pending

Defines the maximum number of messages, without an acknowledgement, that can be outstanding

Long

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.max-deliver

The maximum number of times a specific message delivery will be attempted

Long

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.replay-policy, "If the policy is ReplayOriginal, the messages in the stream will be pushed to the client at the same rate that they were originally received, simulating the original timing of messages. If the policy is ReplayInstant (the default), the messages will be pushed to the client as fast as possible while adhering to the Ack Policy, Max Ack Pending and the client’s ability to consume those messages."

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.replicas

Sets the number of replicas for the consumer’s state. By default, when the value is set to zero, consumers inherit the number of replicas from the stream

Integer

0

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.memory-storage

If set, forces the consumer state to be kept in memory rather than inherit the storage type of the stream (file in this case).

Boolean

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.back-off

The timing of re-deliveries as a comma-separated list of durations

Duration

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.acknowledge-timeout

The duration in milliseconds to wait for an ack confirmation

Duration

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.start-sequence

The start sequence

Long

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.start-time

The start time

ZonedDateTime

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.sample-frequency

The sample frequency

String

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.meta-data

The consumer metadata

Map<String, String>

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].consumer-configuration.pause-until

The pause until

ZonedDateTime

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].push-configuration.push.ordered

Flag indicating whether this subscription should be ordered

Boolean

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].push-configuration.deliver-group

The optional deliver group to join

String

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].push-configuration.flow-control

Enables per-subscription flow control using a sliding-window protocol. This protocol relies on the server and client exchanging messages to regulate when and how many messages are pushed to the client. This one-to-one flow control mechanism works in tandem with the one-to-many flow control imposed by MaxAckPending across all subscriptions bound to a consumer.

Duration

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].push-configuration.idle-heart-beat

If the idle heartbeat period is set, the server will regularly send a status message to the client (i.e. when the period has elapsed) while there are no new messages to send. This lets the client know that the JetStream service is still up and running, even when there is no activity on the stream. The message status header will have a code of 100. Unlike FlowControl, it will have no reply to address. It may have a description such \"Idle Heartbeat\". Note that this heartbeat mechanism is all handled transparently by supported clients and does not need to be handled by the application.

Duration

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].push-configuration.rate-limit

Used to throttle the delivery of messages to the consumer, in bits per second.

Long

quarkus.messaging.nats.streams.[stream-name].push-consumers.[consumer-name].push-configuration.headers-only

Delivers only the headers of messages in the stream and not the bodies. Additionally adds Nats-Msg-Size header to indicate the size of the removed payload.

Boolean

false

Channel configuration

Table 3. Subscriber processor attributes
Property Name Description Type Default Value

mp.messaging.outgoing.[channel-name].stream

The stream to subscribe messages to

String

mp.messaging.outgoing.[channel-name].subject

The subject to subscribe messages to

String

Table 4. Publisher processor attributes
Property Name Description Type Default Value

mp.messaging.incoming.[channel-name].stream

The stream to publish messages from

String

mp.messaging.incoming.[channel-name].consumer

The name of the consumer to use

String

mp.messaging.incoming.[channel-name].retry-backoff

The retry backoff in milliseconds for retry publishing messagess

Long

10000

Key/Value Store configuration

Table 5. NATS Key/Value Store configuration
quarkus.messaging.nats.key-value-stores[bucket-name].description Description of Key-Value store String

quarkus.messaging.nats.key-value-stores[bucket-name].storage-type

The storage type

File or Memory

File

quarkus.messaging.nats.key-value-stores[bucket-name].max-bucket-size

The maximum number of bytes for this bucket

Long

quarkus.messaging.nats.key-value-stores[bucket-name].max-history-per-key

The maximum amount of history for any one key. Includes the current value.

Integer

quarkus.messaging.nats.key-value-stores[bucket-name].max-value-size

The maximum size for an individual value in the bucket

Integer

quarkus.messaging.nats.key-value-stores[bucket-name].ttl

The maximum age for a value in this bucket

Duration

quarkus.messaging.nats.key-value-stores[bucket-name].replicas

The number of replicas for this bucket

Integer

1

quarkus.messaging.nats.key-value-stores[bucket-name].compressed

Whether to use compression

Boolean

false

Dev service configuration

Property Name Description Type Default Value

quarkus.messaging.nats.jet-stream.devservices.enabled

If Dev Services for NATS JetStream has been explicitly enabled or disabled

Boolean

true

quarkus.messaging.nats.jet-stream.devservices.port

Fixed port the dev service will listen to

Integer

quarkus.messaging.nats.jet-stream.devservices.image-name

The image to use

String

nats:2.11

quarkus.messaging.nats.jet-stream.devservices.shared

Indicates if the NATS JetStream broker managed by Quarkus Dev Services is shared

Boolean

true

quarkus.messaging.nats.jet-stream.devservices.service-name

This property is used when you need multiple shared NATS JetStream brokers.

String

nats

NATS JetStream

This extension uses the NATS JetStream client to connect to a NATS JetStream broker.

Further documentation can be found at:

Reactive Messaging

This extension uses SmallRye Reactive Messaging to build data streaming applications.

If you want to go further, check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.