Events & Messaging
Quarkus Flow Messaging bridges the workflow engine with MicroProfile Reactive Messaging (SmallRye), so workflows can listen to and emit CloudEvents over Kafka (or any supported connector). It activates automatically when Quarkus Messaging is on the classpath.
Activation
You do not add a separate quarkus-flow-messaging artifact.
The bridge is auto-registered when any quarkus-messaging-* connector is present (e.g., Kafka, AMQP, JMS).
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
| Enable the default bridge bean with: |
quarkus.flow.messaging.defaults-enabled=true
This creates a CDI bean that consumes domain events from flow-in and publishes domain events to flow-out.
What you get
-
Default channels –
flow-in(inbound) andflow-out(outbound) for structured CloudEvents JSON. -
Connector-agnostic – works with any SmallRye connector you add (Kafka, JMS, AMQP, …).
-
Optional lifecycle stream – publish engine lifecycle CloudEvents on
flow-lifecycle-out. -
BYO integration – you can override with your own
EventConsumer/EventPublisherbeans if needed.
Quickstart (Kafka)
3) Map default channels to Kafka topics
# Inbound (structured CloudEvents JSON)
mp.messaging.incoming.flow-in.connector=smallrye-kafka
mp.messaging.incoming.flow-in.topic=flow-in
mp.messaging.incoming.flow-in.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
mp.messaging.incoming.flow-in.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# Outbound (structured CloudEvents JSON)
mp.messaging.outgoing.flow-out.connector=smallrye-kafka
mp.messaging.outgoing.flow-out.topic=flow-out
mp.messaging.outgoing.flow-out.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
mp.messaging.outgoing.flow-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer
# Optional if Dev Services is off:
# kafka.bootstrap.servers=localhost:9092
4) A tiny event-driven workflow
package org.acme;
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;
import jakarta.enterprise.context.ApplicationScoped;
import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;
@ApplicationScoped
public class HelloMessagingFlow extends Flow {
@Override
public Workflow descriptor() {
return FuncWorkflowBuilder.workflow("hello-messaging")
.tasks(
// Wait for one request event
listen("waitHello", to().one(event("org.acme.hello.request")))
// listen() returns a collection; pick the first
.outputAs((java.util.Collection<Object> c) -> c.iterator().next()),
// Build a response with jq
set("{ message: \"Hello \" + .name }"),
// Emit the response event
emitJson("org.acme.hello.response", java.util.Map.class))
.build();
}
}
Lifecycle events (optional)
Enable a separate lifecycle stream (engine signals: task/workflow started, completed, suspended, faulted, …):
quarkus.flow.messaging.lifecycle-enabled=true
mp.messaging.outgoing.flow-lifecycle-out.connector=smallrye-kafka
mp.messaging.outgoing.flow-lifecycle-out.topic=flow-lifecycle-out
mp.messaging.outgoing.flow-lifecycle-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.flow-lifecycle-out.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
Common types:
io.serverlessworkflow.task.started.v1, …task.completed.v1, …workflow.started.v1, …workflow.completed.v1, etc.
Bring your own messaging (advanced)
Provide your own beans to take full control:
-
One
io.serverlessworkflow.impl.events.EventConsumer(inbound) -
Zero or more
io.serverlessworkflow.impl.events.EventPublisher(outbound)
If none are provided, the defaults above apply when defaults-enabled=true.
Notes on serialization
-
We use structured CloudEvents JSON end-to-end (
byte[]payload on the wire). -
Event
datais typicallyapplication/json. Jackson handles (de)serialization of payloads referenced by your workflow tasks.
See also
-
DSL cheatsheet (Emit/Listen patterns): Java DSL cheatsheet
-
Spec reference (Emit/Listen): Specification (quick reference)