Using WebSockets with Reactive Messaging
This guide demonstrates how your Quarkus application can utilize SmallRye Reactive Messaging to
consume and produce messages via WebSockets.
WebSockets support is a part of the Reactive Messaging HTTP extension (quarkus-reactive-messaging-http
).
Prerequisites
To complete this guide, you need:
-
less than 15 minutes
-
an IDE
-
JDK 11+ installed with
JAVA_HOME
configured appropriately -
Apache Maven 3.8.1+
-
GraalVM, Docker or Podman installed if you want to run in native mode.
Architecture
In this guide we will implement a service, namely CostConverter
that consumes messages
with costs in multiple currencies and converts each cost to its value in Euro.
To let a user easily try out the service, we will implement an HTTP resource summing up the costs
(CostCollector
), and a simple web page to add new costs and watch the sum.
Solution
We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.
Clone the Git repository: git clone https://github.com/quarkiverse/quarkus-reactive-messaging-http.git
, or download an archive.
The solution is located in the websockets-quickstart
directory.
Creating the Maven Project
First, we need a new project. Create a new project with the following command:
mvn io.quarkus:quarkus-maven-plugin:3.16.2:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=reactive-messaging-websockets-quickstart \
-Dextensions="reactive-messaging-http,resteasy-reactive-jackson" \
-DnoCode
cd reactive-messaging-websockets-quickstart
This command generates a Maven project with Reactive Messaging HTTP connector and RESTEasy Reactive with Jackson support extensions.
The Converter
Create the src/main/java/org/acme/reactivews/CostConverter.java
file, with the following content:
package org.acme.reactivews;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.HashMap;
import java.util.Map;
/**
* A bean consuming costs in multiple currencies and producing prices in EUR from them
*/
@ApplicationScoped
public class CostConverter {
private static final Map<String, Double> conversionRatios = new HashMap<>();
static {
conversionRatios.put("CHF", 0.93);
conversionRatios.put("USD", 0.84);
conversionRatios.put("PLN", 0.22);
conversionRatios.put("EUR", 1.0);
}
@Incoming("incoming-costs") (1)
@Outgoing("outgoing-costs") (2)
Cost convert(Cost cost) { (3)
Double conversionRatio = conversionRatios.get(cost.getCurrency().toUpperCase());
if (conversionRatio == null) {
return cost;
}
return new Cost(conversionRatio * cost.getValue(), "EUR");
}
}
1 | Consume messages from the incoming-costs stream. |
2 | Dispatch returned values to the outgoing-costs stream. |
3 | Consume an event with payload of type Cost and produce another Cost , with value converted
to Euro if we know the conversion ratio.
In the case of consuming an arbitrary object, the reactive-messaging-http extension will attempt
to deserialize the request body as a JSON object. |
Unlike the outbound HTTP connector, the outbound WebSocket connector does not check if the message is consumed by the remote endpoint. It may happen that a failure to receive a message is not reported, e.g. if the remote side closes the WebSocket connection in a crucial moment. |
Let’s define the Cost
class:
package org.acme.reactivews;
public class Cost {
private double value;
private String currency;
public Cost(double value, String currency) {
this.value = value;
this.currency = currency;
}
public Cost() {
}
public void setValue(double value) {
this.value = value;
}
public void setCurrency(String currency) {
this.currency = currency;
}
public double getValue() {
return value;
}
public String getCurrency() {
return currency;
}
}
In the next step, we will create configurations for both streams in the application.properties
file.
Configuring the Web Socket connector
We need to configure the Web Socket connector. This is done in the application.properties
file.
The keys are structured as follows:
mp.messaging.[outgoing|incoming].{channel-name}.{property}=value
The channel-name
segment must match the value set in the @Incoming
and @Outgoing
annotation:
-
incoming-costs
→ an inbound WebSocket that receives costs -
outgoing-costs
→ an outbound WebSocket that pushes converted costs
mp.messaging.outgoing.outgoing-costs.connector=quarkus-websocket
# the WebSockets are exposed on the same port as HTTP
# for non-test profiles, it is quarkus.http.port...
mp.messaging.outgoing.outgoing-costs.url=ws://localhost:${quarkus.http.port}/cost-collector
# for the test profile it is quarkus.http.test-port
%test.mp.messaging.outgoing.outgoing-costs.url=ws://localhost:${quarkus.http.test-port}/cost-collector
mp.messaging.incoming.incoming-costs.connector=quarkus-websocket
# the incoming-costs channel will be fed via a Web Socket endpoint on the `/costs` path
mp.messaging.incoming.incoming-costs.path=/costs
mp.messaging.incoming.collector.path=/cost-collector
mp.messaging.incoming.collector.connector=quarkus-websocket
The CostCollector
To illustrate that converting messages and passing them through works, let’s add a CostCollector
class that consumes the Web Socket messages and exposes information about the sum of collected costs via REST:
package org.acme.reactivews;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
@Path("/collected-costs")
@ApplicationScoped
public class CostCollector {
private double sum;
@GET
// expose the sum of the collected costs
public synchronized double getCosts() {
return sum;
}
@Incoming("collector")
// consume costs from collector channel
synchronized void collect(Cost cost) {
if ("EUR".equals(cost.getCurrency())) {
sum += cost.getValue();
}
}
}
One more thing we have to do is to configure the collector
channel in application.properties
:
mp.messaging.incoming.collector.path=/cost-collector
mp.messaging.incoming.collector.connector=quarkus-websocket
The HTML page
To conveniently interact with the application, let’s create a simple web page.
The page will provide a form to add costs, and an info of the current sum of costs.
The page periodically updates the sum by requesting the current sum from /cost-collector
.
Change the src/main/resources/META-INF/resources/index.html
file, with the following content:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Costs</title>
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
<h2>Add a cost</h2>
<div>
<div>
<label for="value">Value</label>
<input type="text" id="value">
</div>
<div>
<label for="currency">Currency</label>
<select id="currency">
<option value="CHF">Swiss franc</option>
<option value="USD">United States dollar</option>
<option value="PLN">Polish złoty</option>
<option value="EUR">Euro</option>
</select>
</div>
<input type="button" onclick="add()" value="Add">
</div>
<h2>Summary</h2>
<div class="row">
<p class="col-md-12">The total cost is <strong><span id="content">N/A</span> €</strong>.</p>
</div>
</div>
</body>
<script>
var webSocket = new WebSocket(`ws://${document.location.host}/costs`);
add = function() {
const cost = {
value: document.getElementById('value').value,
currency: document.getElementById('currency').value
};
webSocket.send(JSON.stringify(cost));
}
updateCost = function() {
fetch('collected-costs').then(response => response.text()).then(sum =>
document.getElementById('content').textContent=sum
);
}
window.setInterval(updateCost, 500);
</script>
</html>
Get it running
Run the application using:
./mvnw quarkus:dev
Open http://localhost:8080/index.html
in your browser.
Going further
WebSockets
All quarkus-websocket
connector options:
# OUTGOING
# The target URL
mp.messaging.outgoing.<channelName>.url=ws://localhost:8234/
# Message serializer, optional, implementation of `io.quarkus.reactivemessaging.http.runtime.serializers.Serializer`
mp.messaging.outgoing.<channelName>.serializer=com.example.MySerializer
# The number of retries to make for sending a message to a remote websocket endpoint.
# A value greater than 0 is advised. Otherwise, a web socket timeout can result in a dropped message
# The default value is 1
mp.messaging.outgoing.<channelName>.maxRetries=1
# Configures the random factor when using back-off with maxAttempts > 1, 0.5 by default
mp.messaging.outgoing.<channelName>.jitter=0.7
# Configures a back-off delay between attempts to send a request.
# A random factor (jitter) is applied to increase the delay when several failures happen.
mp.messaging.outgoing.<channelName>.delay=2s
# INCOMING
# The path of the endpoint
mp.messaging.incoming.<channelName>.path=/my-ws-endpoint
# Web socket endpoint buffers messages if a consumer is not able to keep up.
# This setting specifies the size of the buffer. 8 by default
mp.messaging.incoming.<channelName>.buffer-size=3
Reactive Messaging
This extension utilizes SmallRye Reactive Messaging to build data streaming applications.
If you want to go further check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.