Use messaging and events
Quarkus Flow Messaging bridges the workflow engine with MicroProfile Reactive Messaging (SmallRye). This allows your workflows to act as fully event-driven orchestrations: they can be triggered by events, emit events to external systems, and pause execution while listening for asynchronous callbacks.
The bridge activates automatically when a supported Quarkus Messaging connector (like Kafka or AMQP) is on the classpath.
This guide shows how to:
-
Activate the Flow ↔ Reactive Messaging bridge.
-
Map the default
flow-in/flow-outchannels to Kafka. -
Use the Java DSL to emit and listen for events.
-
Enable automated lifecycle events.
-
Understand the CloudEvent correlation metadata.
Prerequisites
-
A Quarkus application with Quarkus Flow set up.
-
Quarkus Messaging on the classpath (e.g.,
quarkus-messaging-kafka). -
Basic familiarity with MicroProfile Reactive Messaging configuration (
mp.messaging.*).
1. Add the dependencies and activate the bridge
You do not need a separate quarkus-flow-messaging artifact. The bridge is auto-registered when any quarkus-messaging-* connector is present.
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
To activate the bridge, enable the default domain event channels in your application.properties:
quarkus.flow.messaging.defaults-enabled=true
Once enabled, the engine creates two default channels:
-
flow-in(Inbound): Consumes structured CloudEvents JSON to start or wake up workflows. -
flow-out(Outbound): Publishes structured CloudEvents JSON emitted by workflows.
2. Map channels to Kafka topics
Configure MicroProfile Reactive Messaging to connect the default Flow channels to your Kafka topics.
Notice that Quarkus Flow uses byte[] for the payload on the wire; the engine handles the serialization and deserialization of the CloudEvent JSON envelope automatically.
# Inbound (Listens for events to start or resume workflows)
mp.messaging.incoming.flow-in.connector=smallrye-kafka
mp.messaging.incoming.flow-in.topic=flow-in
mp.messaging.incoming.flow-in.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
mp.messaging.incoming.flow-in.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# Outbound (Publishes events emitted by the workflow)
mp.messaging.outgoing.flow-out.connector=smallrye-kafka
mp.messaging.outgoing.flow-out.topic=flow-out
mp.messaging.outgoing.flow-out.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
mp.messaging.outgoing.flow-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer
# Optional if Dev Services is off:
# kafka.bootstrap.servers=localhost:9092
3. Emit and Listen in the Java DSL
In your Quarkus Flow definitions, you interact with these Kafka topics using two primary tasks:
-
emitJson("event.type", Pojo.class): Takes a standard Java POJO from your workflow data, wraps it in a CloudEvent envelope with the specified type, and publishes it toflow-out. -
listen("taskName", toOne("event.type")): Pauses the workflow instance and releases the execution thread. The engine defines an Event Filter waiting for a CloudEvent ofevent.typeto arrive onflow-in. When it does, the engine automatically correlates it to the paused instance, extracts the payload, and resumes execution.
Here is a complete example of an event-driven workflow:
package org.acme;
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;
import jakarta.enterprise.context.ApplicationScoped;
import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;
@ApplicationScoped
public class HelloMessagingFlow extends Flow {
@Override
public Workflow descriptor() {
return FuncWorkflowBuilder.workflow("hello-messaging")
.tasks(
// Wait for one request event
listen("waitHello", to().one(event("org.acme.hello.request")))
// listen() returns a collection; pick the first
.outputAs((java.util.Collection<Object> c) -> c.iterator().next()),
// Build a response with jq
set("{ message: \"Hello \" + .name }"),
// Emit the response event
emitJson("org.acme.hello.response", java.util.Map.class))
.build();
}
}
With this in place:
-
External producers can send CloudEvents to the
flow-intopic to start or continue workflows. -
Workflows can
emitCloudEvents, which are then written to theflow-outtopic.
4. Enable lifecycle events (Optional)
You can configure the engine to publish its internal state changes (e.g., when a workflow starts, suspends, or faults) to a dedicated Kafka topic. This is highly useful for building observability dashboards or audit logs.
quarkus.flow.messaging.lifecycle-enabled=true
mp.messaging.outgoing.flow-lifecycle-out.connector=smallrye-kafka
mp.messaging.outgoing.flow-lifecycle-out.topic=flow-lifecycle-out
mp.messaging.outgoing.flow-lifecycle-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.flow-lifecycle-out.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
Common lifecycle event types published to this topic include:
-
io.serverlessworkflow.workflow.started.v1 -
io.serverlessworkflow.workflow.completed.v1 -
io.serverlessworkflow.task.started.v1 -
io.serverlessworkflow.task.suspended.v1 -
io.serverlessworkflow.task.faulted.v1
5. CloudEvent Correlation Headers
For idempotency and end-to-end distributed traceability, Quarkus Flow automatically attaches correlation metadata to all emitted events via CloudEvent Extension Context Attributes.
By default, every event published to flow-out will include:
-
flowinstanceid: The UUID of the executingWorkflowInstance. -
flowtaskid: The JSON Pointer of the specific task that emitted the event (e.g.,do/0/task).
If you are writing a custom consumer downstream, you can extract these attributes directly from the CloudEvent:
import io.cloudevents.CloudEvent;
import io.quarkus.logging.Log;
public void printCorrelation(CloudEvent event) {
String instanceId = (String) event.getExtension("flowinstanceid");
String taskId = (String) event.getExtension("flowtaskid");
Log.infof("Flow Instance ID: %s", instanceId);
Log.infof("Flow Task ID: %s", taskId);
}
If you need to rename these keys to match your enterprise naming conventions, or disable them entirely:
quarkus.flow.messaging.metadata.instance-id.key=mycompany-instance-id
quarkus.flow.messaging.metadata.task-id.key=mycompany-task-id
# To disable injection entirely:
# quarkus.flow.messaging.enable-metadata-propagation=false
6. Bring your own messaging (Advanced)
If you need full control over how events are consumed and produced—for example, to route events through custom enrichment logic before they reach the engine, or to filter which events get published—you can bypass the flow-in and flow-out channels.
Provide your own CDI beans implementing the Flow interfaces:
-
Exactly one
io.serverlessworkflow.impl.events.EventConsumer(for inbound events). -
Zero or more
io.serverlessworkflow.impl.events.EventPublisher(for outbound events).
When you provide these beans, the default bridge is ignored.
See also
-
Data flow and context management — how to shape event payloads.
-
Java DSL cheatsheet — syntax for
emit,listen, and event builders. -
Serverless Workflow Spec mapping and concepts — the spec rules for event consumption.