Usage
Add your channels configuration in src/main/resources/application.properties
Then configure your application by adding the HiveMQ connector type:
# Configure the MQTT sink (we write to it)
mp.messaging.outgoing.topic-price.connector=smallrye-mqtt-hivemq
mp.messaging.outgoing.topic-price.topic=prices
mp.messaging.outgoing.topic-price.auto-generated-client-id=true
# Configure the MQTT source (we read from it)
mp.messaging.incoming.prices.connector=smallrye-mqtt-hivemq
mp.messaging.incoming.prices.topic=prices
mp.messaging.incoming.prices.auto-generated-client-id=true
| change topic and channels names according to your needs |
And then implement your business logic
package incoming;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MqttPriceConsumer {
@Incoming("prices")
public void consume(byte[] raw) {
double price = Double.parseDouble(new String(raw));
// process your price.
}
}
package outgoing;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PriceGenerator {
private static final Logger LOG = Logger.getLogger(PriceGenerator.class);
private Random random = new Random();
@Outgoing("topic-price")
public Multi<Integer> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onOverflow().drop()
.map(tick -> {
int price = random.nextInt(100);
LOG.infof("Sending price: %d", price);
return price;
});
}
}
| On the above example, we are pushing events into the channel topic-price (which, based on the application.properties, is pointing to the topic prices). On the other hand, we are consuming these events through the channel prices, which is also pointing to the same topic. |