Use messaging and events

Quarkus Flow Messaging bridges the workflow engine with MicroProfile Reactive Messaging (SmallRye), so workflows can listen to and emit CloudEvents over Kafka (or any supported connector). It activates automatically when Quarkus Messaging is on the classpath.

This guide shows how to:

  • activate the Flow ↔ Reactive Messaging bridge

  • use the default flow-in / flow-out channels with Kafka

  • enable lifecycle events

  • override the bridge with your own messaging beans

  • understand the serialization format

Prerequisites

  • A Quarkus application with Quarkus Flow set up.

  • Quarkus Messaging on the classpath (for example quarkus-messaging-kafka).

  • Basic familiarity with MicroProfile Reactive Messaging configuration (mp.messaging.*).

1. Activate the bridge

You do not add a separate quarkus-flow-messaging artifact. The bridge is auto-registered when any quarkus-messaging-* connector is present (for example, Kafka, AMQP, JMS).

Maven (Kafka connector)
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-messaging-kafka</artifactId>
</dependency>

Enable the default bridge bean with:

quarkus.flow.messaging.defaults-enabled=true

This creates a CDI bean that consumes domain events from flow-in and publishes domain events to flow-out.

2. Understand what you get

Once enabled, the default bridge provides:

  • Default channelsflow-in (inbound) and flow-out (outbound) for structured CloudEvents JSON.

  • Connector-agnostic wiring – works with any SmallRye connector you add (Kafka, JMS, AMQP, …).

  • Optional lifecycle stream – publish engine lifecycle CloudEvents on flow-lifecycle-out.

  • BYO integration – you can override with your own EventConsumer / EventPublisher beans if needed.

3. Quickstart with Kafka

3.1 Add the connector

Add quarkus-messaging-kafka as shown above; the bridge auto-activates.

3.2 Turn on the default bridge

quarkus.flow.messaging.defaults-enabled=true

3.3 Map default channels to Kafka topics

# Inbound (structured CloudEvents JSON)
mp.messaging.incoming.flow-in.connector=smallrye-kafka
mp.messaging.incoming.flow-in.topic=flow-in
mp.messaging.incoming.flow-in.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
mp.messaging.incoming.flow-in.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# Outbound (structured CloudEvents JSON)
mp.messaging.outgoing.flow-out.connector=smallrye-kafka
mp.messaging.outgoing.flow-out.topic=flow-out
mp.messaging.outgoing.flow-out.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
mp.messaging.outgoing.flow-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer

# Optional if Dev Services is off:
# kafka.bootstrap.servers=localhost:9092

3.4 Define a tiny event-driven workflow

package org.acme;

import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;

import jakarta.enterprise.context.ApplicationScoped;

import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;

@ApplicationScoped
public class HelloMessagingFlow extends Flow {

    @Override
    public Workflow descriptor() {
        return FuncWorkflowBuilder.workflow("hello-messaging")
                .tasks(
                        // Wait for one request event
                        listen("waitHello", to().one(event("org.acme.hello.request")))
                                // listen() returns a collection; pick the first
                                .outputAs((java.util.Collection<Object> c) -> c.iterator().next()),

                        // Build a response with jq
                        set("{ message: \"Hello \" + .name }"),

                        // Emit the response event
                        emitJson("org.acme.hello.response", java.util.Map.class))
                .build();
    }
}

With this in place:

  • external producers can send CloudEvents to the flow-in topic to start or continue workflows

  • workflows can emit CloudEvents, which are then written to the flow-out topic

4. Enable lifecycle events (optional)

You can also enable a separate lifecycle stream (engine signals: task/workflow started, completed, suspended, faulted, …):

quarkus.flow.messaging.lifecycle-enabled=true

mp.messaging.outgoing.flow-lifecycle-out.connector=smallrye-kafka
mp.messaging.outgoing.flow-lifecycle-out.topic=flow-lifecycle-out
mp.messaging.outgoing.flow-lifecycle-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.flow-lifecycle-out.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

Common event types include:

  • io.serverlessworkflow.task.started.v1

  • io.serverlessworkflow.task.completed.v1

  • io.serverlessworkflow.workflow.started.v1

  • io.serverlessworkflow.workflow.completed.v1

  • and similar variants for faults/suspensions.

This stream is useful for observability dashboards and external monitoring systems.

5. Bring your own messaging (advanced)

If you need full control over how events are consumed and produced, you can provide your own beans:

  • exactly one io.serverlessworkflow.impl.events.EventConsumer (inbound)

  • zero or more io.serverlessworkflow.impl.events.EventPublisher (outbound)

If none are provided, the defaults described above apply when quarkus.flow.messaging.defaults-enabled=true.

This lets you:

  • integrate with existing messaging abstractions

  • route events through custom logic before they reach the engine

  • publish only a subset of events, or enrich them before sending

6. Notes on serialization

  • Flow Messaging uses structured CloudEvents JSON end-to-end (byte[] payload on the wire).

  • Event data is typically application/json. Jackson handles (de)serialization of payloads referenced by your workflow tasks.

  • On the Reactive Messaging side, you configure byte[] serializers/deserializers (as shown in the Kafka example); the bridge takes care of turning those bytes into CloudEvents.

See also