Use messaging and events

Quarkus Flow Messaging bridges the workflow engine with MicroProfile Reactive Messaging (SmallRye). This allows your workflows to act as fully event-driven orchestrations: they can be triggered by events, emit events to external systems, and pause execution while listening for asynchronous callbacks.

The bridge activates automatically when a supported Quarkus Messaging connector (like Kafka or AMQP) is on the classpath.

This guide shows how to:

  • Activate the Flow ↔ Reactive Messaging bridge.

  • Map the default flow-in / flow-out channels to Kafka.

  • Use the Java DSL to emit and listen for events.

  • Enable automated lifecycle events.

  • Understand the CloudEvent correlation metadata.

Prerequisites

  • A Quarkus application with Quarkus Flow set up.

  • Quarkus Messaging on the classpath (e.g., quarkus-messaging-kafka).

  • Basic familiarity with MicroProfile Reactive Messaging configuration (mp.messaging.*).

1. Add the dependencies and activate the bridge

You do not need a separate quarkus-flow-messaging artifact. The bridge is auto-registered when any quarkus-messaging-* connector is present.

pom.xml
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-messaging-kafka</artifactId>
</dependency>

To activate the bridge, enable the default domain event channels in your application.properties:

quarkus.flow.messaging.defaults-enabled=true

Once enabled, the engine creates two default channels:

  • flow-in (Inbound): Consumes structured CloudEvents JSON to start or wake up workflows.

  • flow-out (Outbound): Publishes structured CloudEvents JSON emitted by workflows.

2. Map channels to Kafka topics

Configure MicroProfile Reactive Messaging to connect the default Flow channels to your Kafka topics.

Notice that Quarkus Flow uses byte[] for the payload on the wire; the engine handles the serialization and deserialization of the CloudEvent JSON envelope automatically.

# Inbound (Listens for events to start or resume workflows)
mp.messaging.incoming.flow-in.connector=smallrye-kafka
mp.messaging.incoming.flow-in.topic=flow-in
mp.messaging.incoming.flow-in.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
mp.messaging.incoming.flow-in.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# Outbound (Publishes events emitted by the workflow)
mp.messaging.outgoing.flow-out.connector=smallrye-kafka
mp.messaging.outgoing.flow-out.topic=flow-out
mp.messaging.outgoing.flow-out.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
mp.messaging.outgoing.flow-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer

# Optional if Dev Services is off:
# kafka.bootstrap.servers=localhost:9092

3. Emit and Listen in the Java DSL

In your Quarkus Flow definitions, you interact with these Kafka topics using two primary tasks:

  • emitJson("event.type", Pojo.class): Takes a standard Java POJO from your workflow data, wraps it in a CloudEvent envelope with the specified type, and publishes it to flow-out.

  • listen("taskName", toOne("event.type")): Pauses the workflow instance and releases the execution thread. The engine defines an Event Filter waiting for a CloudEvent of event.type to arrive on flow-in. When it does, the engine automatically correlates it to the paused instance, extracts the payload, and resumes execution.

Here is a complete example of an event-driven workflow:

package org.acme;

import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;

import jakarta.enterprise.context.ApplicationScoped;

import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;

@ApplicationScoped
public class HelloMessagingFlow extends Flow {

    @Override
    public Workflow descriptor() {
        return FuncWorkflowBuilder.workflow("hello-messaging")
                .tasks(
                        // Wait for one request event
                        listen("waitHello", to().one(event("org.acme.hello.request")))
                                // listen() returns a collection; pick the first
                                .outputAs((java.util.Collection<Object> c) -> c.iterator().next()),

                        // Build a response with jq
                        set("{ message: \"Hello \" + .name }"),

                        // Emit the response event
                        emitJson("org.acme.hello.response", java.util.Map.class))
                .build();
    }
}

With this in place:

  • External producers can send CloudEvents to the flow-in topic to start or continue workflows.

  • Workflows can emit CloudEvents, which are then written to the flow-out topic.

4. Enable lifecycle events (Optional)

You can configure the engine to publish its internal state changes (e.g., when a workflow starts, suspends, or faults) to a dedicated Kafka topic. This is highly useful for building observability dashboards or audit logs.

quarkus.flow.messaging.lifecycle-enabled=true

mp.messaging.outgoing.flow-lifecycle-out.connector=smallrye-kafka
mp.messaging.outgoing.flow-lifecycle-out.topic=flow-lifecycle-out
mp.messaging.outgoing.flow-lifecycle-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.flow-lifecycle-out.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

Common lifecycle event types published to this topic include:

  • io.serverlessworkflow.workflow.started.v1

  • io.serverlessworkflow.workflow.completed.v1

  • io.serverlessworkflow.task.started.v1

  • io.serverlessworkflow.task.suspended.v1

  • io.serverlessworkflow.task.faulted.v1

5. CloudEvent Correlation Headers

For idempotency and end-to-end distributed traceability, Quarkus Flow automatically attaches correlation metadata to all emitted events via CloudEvent Extension Context Attributes.

By default, every event published to flow-out will include:

  • flowinstanceid: The UUID of the executing WorkflowInstance.

  • flowtaskid: The JSON Pointer of the specific task that emitted the event (e.g., do/0/task).

If you are writing a custom consumer downstream, you can extract these attributes directly from the CloudEvent:

import io.cloudevents.CloudEvent;
import io.quarkus.logging.Log;

public void printCorrelation(CloudEvent event) {
    String instanceId = (String) event.getExtension("flowinstanceid");
    String taskId = (String) event.getExtension("flowtaskid");

    Log.infof("Flow Instance ID: %s", instanceId);
    Log.infof("Flow Task ID: %s", taskId);
}

If you need to rename these keys to match your enterprise naming conventions, or disable them entirely:

quarkus.flow.messaging.metadata.instance-id.key=mycompany-instance-id
quarkus.flow.messaging.metadata.task-id.key=mycompany-task-id

# To disable injection entirely:
# quarkus.flow.messaging.enable-metadata-propagation=false

6. Bring your own messaging (Advanced)

If you need full control over how events are consumed and produced—for example, to route events through custom enrichment logic before they reach the engine, or to filter which events get published—you can bypass the flow-in and flow-out channels.

Provide your own CDI beans implementing the Flow interfaces:

  • Exactly one io.serverlessworkflow.impl.events.EventConsumer (for inbound events).

  • Zero or more io.serverlessworkflow.impl.events.EventPublisher (for outbound events).

When you provide these beans, the default bridge is ignored.

See also