Quarkus Reactive messaging Nats Jetstream
This extension allow usage of NATS JetStream inside a Quarkus App, in JVM and Native mode.
The extension implements a new connector type quarkus-jetstream in SmallRye Reactive Messaging that will use the NATS client.
Installation
If you want to use this extension, you need to add the io.quarkiverse.reactivemessaging.nats-jetstream:quarkus-reactive-messaging-nats-jetstream
extension first to your build file.
For instance, with Maven, add the following dependency to your POM file:
<dependency>
<groupId>io.quarkiverse.reactivemessaging.nats-jetstream</groupId>
<artifactId>quarkus-messaging-nats-jetstream</artifactId>
<version>3.17.10</version>
</dependency>
Then configure your application by adding the NATS JetStream connector type:
# Inbound mp.messaging.incoming.[channel-name].connector=quarkus-jetstream # Outbound mp.messaging.outgoing.[channel-name].connector=quarkus-jetstream
Receiving messages from NATS JetStream
Let’s imagine you have a NATS JetStream broker running, and accessible using the localhost:4242 address. Configure your application to receive NATS messages on the data channel from the stream named: test and the subject named: data as follows:
quarkus.messaging.nats.servers=nats://localhost:4242 quarkus.messaging.nats.username=guest quarkus.messaging.nats.password=guest quarkus.messaging.nats.ssl-enabled=false # Streams and subjects are auto-created by default based on channel configuration quarkus.messaging.nats.jet-stream.auto-configure=true quarkus.messaging.nats.jet-stream.streams[0].name=test quarkus.messaging.nats.jet-stream.streams[0].subjects[0]=data mp.messaging.incoming.data.connector=quarkus-jetstream mp.messaging.incoming.data.stream=test mp.messaging.incoming.data.subject=data
Then, your application receives Message<Data>. You can consumes the payload directly:
package inbound; import org.eclipse.microprofile.reactive.messaging.Incoming; import jakarta.enterprise.context.ApplicationScoped; @ApplicationScoped public class DataConsumer { @Incoming("data") public void consume(Data data) { // process your data. } }
If you want more examples, please take a look at the tests of the extension.
Configuration
quarkus.messaging.nats.servers |
A comma-separated list of URI’s nats://{host}:{port} to use for establishing the initial connection to the NATS cluster. |
quarkus.messaging.nats.username |
The username to connect to the NATS server |
quarkus.messaging.nats.password |
The password to connect to the NATS server |
quarkus.messaging.nats.token |
The token to connect to the NATS server |
quarkus.messaging.nats.credential-path |
The path to the credentials file to connect to the NATS server |
quarkus.messaging.nats.ssl-enabled |
Whether to enable SSL/TLS secure connections to the NATS server |
quarkus.messaging.nats.connection-timeout |
The connection timeout in milliseconds |
quarkus.messaging.nats.error-listener |
The classname for the error listener |
quarkus.messaging.nats.buffer-size |
The size in bytes to make buffers for connections |
quarkus.messaging.nats.keystore-path |
The path to the keystore file |
quarkus.messaging.nats.keystore-password |
The password for the keystore |
quarkus.messaging.nats.truststore-path |
The path to the trust store file |
quarkus.messaging.nats.truststore-password |
The password for the trust store |
quarkus.messaging.nats.tls-algorithm |
The TLS algorithm |
quarkus.messaging.nats.connection-attempts |
The maximum number of attempts to attempt to re-connect to NATS |
quarkus.messaging.nats.connection-backoff |
Back-off delay between to attempt to re-connect to NATS |
quarkus.messaging.nats.jet-stream.auto-configure |
Autoconfigure stream and subjects based on channel configuration |
quarkus.messaging.nats.jet-stream.trace |
Enable tracing |
quarkus.messaging.nats.jet-stream.streams[i].replicas |
The number of replicas a message must be stored. Default value is 1 |
quarkus.messaging.nats.jet-stream.streams[i].storage-type |
The storage type for stream data (File or Memory) |
quarkus.messaging.nats.jet-stream.streams[i].retention-policy |
Declares the retention policy for the stream (Limits or Interest) |
quarkus.messaging.nats.jet-stream.streams[i].name |
Name of the stream to setup if auto-configure is enabled |
quarkus.messaging.nats.jet-stream.streams[i].subjects[n] |
Name of the subject in stream to setup if auto-configure is enabled |
quarkus.messaging.nats.jet-stream.streams[i].overwrite |
If the stream already exists it will be overwritten |
quarkus.messaging.nats.jet-stream.streams[i].description |
Description of stream |
quarkus.messaging.nats.jet-stream.streams[i].compression-option |
The compression option for this stream |
quarkus.messaging.nats.jet-stream.streams[i].maximum-consumers |
The maximum number of consumers for this stream |
quarkus.messaging.nats.jet-stream.streams[i].maximum-messages |
The maximum messages for this stream |
quarkus.messaging.nats.jet-stream.streams[i].maximum-messages-per-subject |
The maximum messages per subject for this stream |
quarkus.messaging.nats.jet-stream.streams[i].maximum-bytes |
The maximum number of bytes for this stream |
quarkus.messaging.nats.jet-stream.streams[i].maximum-age |
The maximum message age for this stream |
quarkus.messaging.nats.jet-stream.streams[i].maximum-message-size |
The maximum message size for this stream |
quarkus.messaging.nats.jet-stream.streams[i].template-owner |
The template json for this stream |
quarkus.messaging.nats.jet-stream.streams[i].discard-policy |
The discard policy for this stream |
quarkus.messaging.nats.jet-stream.streams[i].duplicate-window |
The duplicate checking window stream configuration. Duration. ZERO means duplicate checking is not enabled |
quarkus.messaging.nats.jet-stream.streams[i].allow-rollup |
The flag indicating if the stream allows rollup |
quarkus.messaging.nats.jet-stream.streams[i].allow-direct |
The flag indicating if the stream allows direct message access |
quarkus.messaging.nats.jet-stream.streams[i].mirror-direct |
The flag indicating if the stream allows higher performance and unified direct access for mirrors as well |
quarkus.messaging.nats.jet-stream.streams[i].deny-delete |
The flag indicating if deny delete is set for the stream |
quarkus.messaging.nats.jet-stream.streams[i].deny-purge |
The flag indicating if deny purge is set for the stream |
quarkus.messaging.nats.jet-stream.streams[i].discard-new-per-subject |
Whether discard policy with max message per subject is applied per subject |
quarkus.messaging.nats.jet-stream.streams[i].first-sequence |
The first sequence used in the stream |
quarkus.messaging.nats.jet-stream.key-value-stores[i].bucket-name |
Name of Key-Value store |
quarkus.messaging.nats.jet-stream.key-value-stores[i].description |
Description of Key-Value store |
quarkus.messaging.nats.jet-stream.key-value-stores[i].storage-type |
The storage type (File or Memory) |
quarkus.messaging.nats.jet-stream.key-value-stores[i].max-bucket-size |
The maximum number of bytes for this bucket |
quarkus.messaging.nats.jet-stream.key-value-stores[i].max-history-per-key |
The maximum number of history for any one key. Includes the current value. |
quarkus.messaging.nats.jet-stream.key-value-stores[i].max-value-size |
The maximum size for an individual value in the bucket |
quarkus.messaging.nats.jet-stream.key-value-stores[i].ttl |
The maximum age for a value in this bucket |
quarkus.messaging.nats.jet-stream.key-value-stores[i].replicas |
The number of replicas for this bucket |
quarkus.messaging.nats.jet-stream.key-value-stores[i].compressed |
Whether to use compression |
Channel configuration
mp.messaging.outgoing.[channel-name].stream |
The stream to subscribe messages to |
mp.messaging.outgoing.[channel-name].subject |
The subject to subscribe messages to |
mp.messaging.outgoing.[channel-name].trace-enabled |
Enable traces for subscriber |
mp.messaging.incoming.[channel-name].name |
The name of the NATS consumer |
mp.messaging.incoming.[channel-name].stream |
The stream to publish messages from |
mp.messaging.incoming.[channel-name].subject |
The subject to publish messages from |
mp.messaging.incoming.[channel-name].publisher-type |
The publisher type (Pull, Push) |
mp.messaging.incoming.[channel-name].payload-type |
The class name of the payload type |
mp.messaging.incoming.[channel-name].durable |
Sets the durable name for the consumer |
mp.messaging.incoming.[channel-name].filter-subjects |
A comma separated list of subjects that overlap with the subjects bound to the stream to filter delivery to subscribers |
mp.messaging.incoming.[channel-name].ack-wait |
The duration that the server will wait for an ack for any individual message once it has been delivered to a consumer. If an ack is not received in time, the message will be redelivered |
mp.messaging.incoming.[channel-name].deliver-policy |
The point in the stream to receive messages from, either DeliverAll, DeliverLast, DeliverNew, DeliverByStartSequence, DeliverByStartTime, or DeliverLastPerSubject |
mp.messaging.incoming.[channel-name].description |
A description of the consumer |
mp.messaging.incoming.[channel-name].inactive-threshold |
Duration that instructs the server to cleanup consumers that are inactive for that long |
mp.messaging.incoming.[channel-name].description |
A description of the consumer |
mp.messaging.incoming.[channel-name].max-ack-pending |
Defines the maximum number of messages, without an acknowledgement, that can be outstanding |
mp.messaging.incoming.[channel-name].max-deliver |
The maximum number of times a specific message delivery will be attempted |
mp.messaging.incoming.[channel-name].replay-policy |
If the policy is ReplayOriginal, the messages in the stream will be pushed to the client at the same rate that they were originally received, simulating the original timing of messages. If the policy is ReplayInstant (the default), the messages will be pushed to the client as fast as possible while adhering to the Ack Policy, Max Ack Pending and the client’s ability to consume those messages. |
mp.messaging.incoming.[channel-name].replicas |
Sets the number of replicas for the consumer’s state. By default, when the value is set to zero, consumers inherit the number of replicas from the stream |
mp.messaging.incoming.[channel-name].memory-storage |
If set, forces the consumer state to be kept in memory rather than inherit the storage type of the stream (file in this case). |
mp.messaging.incoming.[channel-name].back-off |
The timing of re-deliveries as a comma-separated list of durations |
mp.messaging.incoming.[channel-name].retry-backoff |
The retry backoff in milliseconds for retry publishing messages |
mp.messaging.incoming.[channel-name].pull.batch-size |
The size of batch of messages to be pulled in pull mode |
mp.messaging.incoming.[channel-name].pull.re-pull-at |
The point in the current batch to tell the server to start the next batch |
mp.messaging.incoming.[channel-name].pull.max-waiting |
The maximum number of waiting pull requests |
mp.messaging.incoming.[channel-name].pull.max-expires |
The maximum duration a single pull request will wait for messages to be available to pull |
mp.messaging.incoming.[channel-name].push.ordered |
Flag indicating whether this subscription should be ordered |
mp.messaging.incoming.[channel-name].push.deliver-group |
The optional deliver group to join |
mp.messaging.incoming.[channel-name].push.flow-control |
Enables per-subscription flow control using a sliding-window protocol. This protocol relies on the server and client exchanging messages to regulate when and how many messages are pushed to the client. This one-to-one flow control mechanism works in tandem with the one-to-many flow control imposed by MaxAckPending across all subscriptions bound to a consumer. |
mp.messaging.incoming.[channel-name].push.deliver-group |
The optional deliver group to join |
mp.messaging.incoming.[channel-name].push.idle-heart-beat |
If the idle heartbeat period is set, the server will regularly send a status message to the client (i.e. when the period has elapsed) while there are no new messages to send. This lets the client know that the JetStream service is still up and running, even when there is no activity on the stream. The message status header will have a code of 100. Unlike FlowControl, it will have no reply to address. It may have a description such \"Idle Heartbeat\". Note that this heartbeat mechanism is all handled transparently by supported clients and does not need to be handled by the application. |
mp.messaging.incoming.[channel-name].push.rate-limit |
Used to throttle the delivery of messages to the consumer, in bits per second. |
mp.messaging.incoming.[channel-name].push.headers-only |
Delivers only the headers of messages in the stream and not the bodies. Additionally adds Nats-Msg-Size header to indicate the size of the removed payload. |
NATS JetStream
This extension utilizes the NATS JetStream client to connect to a NATS JetStream broker.
Reactive Messaging
This extension utilizes SmallRye Reactive Messaging to build data streaming applications.
If you want to go further check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.