Using HTTP with Reactive Messaging
This guide demonstrates how your Quarkus application can utilize SmallRye Reactive Messaging to consume and produce HTTP messages.
Prerequisites
To complete this guide, you need:
-
less than 15 minutes
-
an IDE
-
JDK 11+ installed with
JAVA_HOME
configured appropriately -
Apache Maven 3.8.1+
-
GraalVM, Docker or Podman installed if you want to run in native mode.
Architecture
In this guide we will implement a service, namely CostConverter
that consumes HTTP messages
with costs in multiple currencies and converts each cost to its value in Euro.
To let a user easily try out the service, we will implement an HTTP resource summing up the costs
(CostCollector
), and a simple web page to add new costs and watch the sum.
Solution
We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.
Clone the Git repository: git clone https://github.com/quarkiverse/quarkus-reactive-messaging-http.git
, or download an archive.
The solution is located in the http-quickstart
directory.
Creating the Maven Project
First, we need a new project. Create a new project with the following command:
mvn io.quarkus.platform:quarkus-maven-plugin:3.16.2:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=reactive-messaging-http-quickstart \
-Dextensions="reactive-messaging-http,reasteasy-reactive-jackson" \
-DnoCode
cd reactive-messaging-http-quickstart
This command generates a Maven project, importing the Reactive Messaging and HTTP connector extensions.
The Converter
Create the src/main/java/org/acme/reactivehttp/CostConverter.java
file, with the following content:
package org.acme.reactivehttp;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.HashMap;
import java.util.Map;
/**
* A bean consuming costs in multiple currencies and producing prices in EUR from them
*/
@ApplicationScoped
public class CostConverter {
private static final Logger log = Logger.getLogger(CostConverter.class);
private static final Map<String, Double> conversionRatios = new HashMap<>();
static {
conversionRatios.put("CHF", 0.93);
conversionRatios.put("USD", 0.84);
conversionRatios.put("PLN", 0.22);
conversionRatios.put("EUR", 1.);
}
@Incoming("incoming-costs") (1)
@Outgoing("outgoing-costs") (2)
double convert(Cost cost) { (3)
Double conversionRatio = conversionRatios.get(cost.getCurrency().toUpperCase());
if (conversionRatio == null) {
return 0.;
}
return conversionRatio * cost.getValue();
}
}
1 | Consume messages from the incoming-costs stream. |
2 | Dispatch returned values to the outgoing-costs stream. |
3 | Consume an event with payload of type Cost and produce a double .
In the case of consuming an arbitrary object, the reactive-messaging-http extension will attempt
to deserialize the request body as a JSON object. |
Let’s define the Cost
class:
package org.acme.reactivehttp;
public class Cost {
private double value;
private String currency;
public double getValue() {
return value;
}
public void setValue(double value) {
this.value = value;
}
public String getCurrency() {
return currency;
}
public void setCurrency(String currency) {
this.currency = currency;
}
}
In the next step, we will create configurations for both streams in the application.properties
file.
Configuring the HTTP connector
We need to configure the HTTP connector. This is done in the application.properties
file.
The keys are structured as follows:
mp.messaging.[outgoing|incoming].<channel-name>.<property}=value
The channel-name
segment must match the value set in the @Incoming
and @Outgoing
annotation:
-
incoming-costs
→ a source that receives costs -
outgoing-costs
→ a sink that receives converted costs
mp.messaging.outgoing.outgoing-costs.connector=quarkus-http
# here we are using a URL pointing to an endpoint
# you can use e.g. an environment variable to change it
mp.messaging.outgoing.outgoing-costs.url=http://localhost:${quarkus.http.port}/cost-collector
# we need to use a different port for tests:
%test.mp.messaging.outgoing.outgoing-costs.url=http://localhost:${quarkus.http.test-port}/cost-collector
# POST is the default method. Another possibility is PUT
mp.messaging.outgoing.outgoing-costs.method=POST
mp.messaging.incoming.incoming-costs.connector=quarkus-http
# the incoming-costs channel will be fed via an endpoint on the `/costs` path
mp.messaging.incoming.incoming-costs.path=/costs
# POST is the default method. Another possibility is PUT
mp.messaging.incoming.incoming-costs.method=POST
Broadcast to multiple subscribers
In case single channel should deliver message to multiple subscribers (methods annotated with @Incoming
)
then additional property needs to be set
mp.messaging.incoming.incoming-costs.broadcast=true
This will allow to deliver single message to all subscribers of that channel.
The CostCollector
To illustrate that converting messages and passing them through works, let’s add an endpoint that will receive the outgoing costs and sum them up. This is a usual JAX-RS endpoint.
package org.acme.reactivehttp;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
@Path("/cost-collector")
@ApplicationScoped
public class CostCollector {
private double sum = 0;
@POST
public void consumeCost(String valueAsString) {
sum += Double.parseDouble(valueAsString);
}
@GET
public double getSum() {
return sum;
}
}
The HTML page
To conveniently interact with the application, let’s create a simple web page.
The page will provide a form to add costs, and an info of the current sum of costs.
The page periodically updates the sum by requesting the current sum from /cost-collector
.
Create the src/main/resources/META-INF/resources/index.html
file, with the following content:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Costs</title>
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
<h2>Add a cost</h2>
<div>
<div>
<label for="value">Value</label>
<input type="text" id="value">
</div>
<div>
<label for="currency">Currency</label>
<select id="currency">
<option value="CHF">Swiss franc</option>
<option value="USD">United States dollar</option>
<option value="CHF">Polish złoty</option>
</select>
</div>
<input type="button" onclick="add()" value="Add">
</div>
<h2>Last cost</h2>
<div class="row">
<p class="col-md-12">The total cost is <strong><span id="content">N/A</span> €</strong>.</p>
</div>
</div>
</body>
<script>
add = function() {
var value = document.getElementById('value').value;
var currency = document.getElementById('currency').value;
var cost = {
value: document.getElementById('value').value,
currency: document.getElementById('currency').value
};
fetch('costs', { method: 'POST', body: JSON.stringify(cost) });
}
updateCost = function() {
fetch('cost-collector').then(response => response.text()).then(sum =>
document.getElementById('content').textContent=sum
);
}
window.setInterval(updateCost, 500);
</script>
</html>
Get it running
Run the application using:
./mvnw quarkus:dev
Open http://localhost:8080/index.html
in your browser.
Going further
HTTP connector options
All quarkus-http
connector options:
# OUTGOING
# The target URL
mp.messaging.outgoing.<channelName>.url=http://localhost:8213
# Message payload serializer, optional, implementation of `io.quarkus.reactivemessaging.http.runtime.serializers.Serializer`
mp.messaging.outgoing.<channelName>.serializer=com.example.MySerializer
# The number of attempts to make for sending a request to a remote endpoint. Must not be less than zero
# Zero by default
mp.messaging.outgoing.<channelName>.maxRetries=3
# Configures the random factor when using back-off with maxRetries > 0. 0.5 by default
mp.messaging.outgoing.<channelName>.jitter=0.3
# Configures a back-off delay between attempts to send a request.
# A random factor (jitter) is applied to increase the delay when several failures happen.
mp.messaging.outgoing.<channelName>.delay=1s
#The HTTP method (either `POST` or `PUT`), `POST` by default
mp.messaging.outgoing.<channelName>.method=PUT
#INCOMING
# The HTTP method (either `POST` or `PUT`, `POST` by default
mp.messaging.incoming.<channelName>.method=POST
# The path of the endpoint
mp.messaging.incoming.<channelName>.path=/my-reactive-ws-endpoint
# HTTP endpoint buffers messages if a consumer is not able to keep up. This setting specifies the size of the buffer.
# 8 by default.
mp.messaging.incoming.<channelName>.buffer-size=13
Cloud Event support
The HTTP connector supports binary-mode [cloud event] messages through Metadata.
If incoming HTTP request contains headers of the form ce-
or ce_
, a CloudEventMetadata
instance is included in the Message
metadata
If outgoing Message
metadata includes a CloudEventMetadata
instance, the information contained there will be added as headers prefixed with ce-
in the outgoing HTTP request.
Reactive Messaging
This extension utilizes SmallRye Reactive Messaging to build data streaming applications.
If you want to go further check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.