Use messaging and events
Quarkus Flow Messaging bridges the workflow engine with MicroProfile Reactive Messaging (SmallRye), so workflows can listen to and emit CloudEvents over Kafka (or any supported connector). It activates automatically when Quarkus Messaging is on the classpath.
This guide shows how to:
-
activate the Flow ↔ Reactive Messaging bridge
-
use the default
flow-in/flow-outchannels with Kafka -
enable lifecycle events
-
override the bridge with your own messaging beans
-
understand the serialization format
Prerequisites
-
A Quarkus application with Quarkus Flow set up.
-
Quarkus Messaging on the classpath (for example
quarkus-messaging-kafka). -
Basic familiarity with MicroProfile Reactive Messaging configuration (
mp.messaging.*).
1. Activate the bridge
You do not add a separate quarkus-flow-messaging artifact.
The bridge is auto-registered when any quarkus-messaging-* connector is present
(for example, Kafka, AMQP, JMS).
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
Enable the default bridge bean with:
quarkus.flow.messaging.defaults-enabled=true
This creates a CDI bean that consumes domain events from flow-in and publishes
domain events to flow-out.
2. Understand what you get
Once enabled, the default bridge provides:
-
Default channels –
flow-in(inbound) andflow-out(outbound) for structured CloudEvents JSON. -
Connector-agnostic wiring – works with any SmallRye connector you add (Kafka, JMS, AMQP, …).
-
Optional lifecycle stream – publish engine lifecycle CloudEvents on
flow-lifecycle-out. -
BYO integration – you can override with your own
EventConsumer/EventPublisherbeans if needed.
3. Quickstart with Kafka
3.3 Map default channels to Kafka topics
# Inbound (structured CloudEvents JSON)
mp.messaging.incoming.flow-in.connector=smallrye-kafka
mp.messaging.incoming.flow-in.topic=flow-in
mp.messaging.incoming.flow-in.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
mp.messaging.incoming.flow-in.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# Outbound (structured CloudEvents JSON)
mp.messaging.outgoing.flow-out.connector=smallrye-kafka
mp.messaging.outgoing.flow-out.topic=flow-out
mp.messaging.outgoing.flow-out.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
mp.messaging.outgoing.flow-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer
# Optional if Dev Services is off:
# kafka.bootstrap.servers=localhost:9092
3.4 Define a tiny event-driven workflow
package org.acme;
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;
import jakarta.enterprise.context.ApplicationScoped;
import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;
@ApplicationScoped
public class HelloMessagingFlow extends Flow {
@Override
public Workflow descriptor() {
return FuncWorkflowBuilder.workflow("hello-messaging")
.tasks(
// Wait for one request event
listen("waitHello", to().one(event("org.acme.hello.request")))
// listen() returns a collection; pick the first
.outputAs((java.util.Collection<Object> c) -> c.iterator().next()),
// Build a response with jq
set("{ message: \"Hello \" + .name }"),
// Emit the response event
emitJson("org.acme.hello.response", java.util.Map.class))
.build();
}
}
With this in place:
-
external producers can send CloudEvents to the
flow-intopic to start or continue workflows -
workflows can
emitCloudEvents, which are then written to theflow-outtopic
4. Enable lifecycle events (optional)
You can also enable a separate lifecycle stream (engine signals: task/workflow started, completed, suspended, faulted, …):
quarkus.flow.messaging.lifecycle-enabled=true
mp.messaging.outgoing.flow-lifecycle-out.connector=smallrye-kafka
mp.messaging.outgoing.flow-lifecycle-out.topic=flow-lifecycle-out
mp.messaging.outgoing.flow-lifecycle-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.flow-lifecycle-out.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
Common event types include:
-
io.serverlessworkflow.task.started.v1 -
io.serverlessworkflow.task.completed.v1 -
io.serverlessworkflow.workflow.started.v1 -
io.serverlessworkflow.workflow.completed.v1 -
and similar variants for faults/suspensions.
This stream is useful for observability dashboards and external monitoring systems.
5. Bring your own messaging (advanced)
If you need full control over how events are consumed and produced, you can provide your own beans:
-
exactly one
io.serverlessworkflow.impl.events.EventConsumer(inbound) -
zero or more
io.serverlessworkflow.impl.events.EventPublisher(outbound)
If none are provided, the defaults described above apply when quarkus.flow.messaging.defaults-enabled=true.
This lets you:
-
integrate with existing messaging abstractions
-
route events through custom logic before they reach the engine
-
publish only a subset of events, or enrich them before sending
6. Notes on serialization
-
Flow Messaging uses structured CloudEvents JSON end-to-end (
byte[]payload on the wire). -
Event
datais typicallyapplication/json. Jackson handles (de)serialization of payloads referenced by your workflow tasks. -
On the Reactive Messaging side, you configure
byte[]serializers/deserializers (as shown in the Kafka example); the bridge takes care of turning those bytes into CloudEvents.
See also
-
Java DSL cheatsheet — Emit/Listen patterns in the Java DSL.
-
CNCF Workflow mapping and concepts — CNCF Workflow Emit/Listen and event model.
-
Enable tracing — trace event-driven workflows and correlate with messaging.