Events & Messaging

Quarkus Flow Messaging bridges the workflow engine with MicroProfile Reactive Messaging (SmallRye), so workflows can listen to and emit CloudEvents over Kafka (or any supported connector). It activates automatically when Quarkus Messaging is on the classpath.

Activation

You do not add a separate quarkus-flow-messaging artifact. The bridge is auto-registered when any quarkus-messaging-* connector is present (e.g., Kafka, AMQP, JMS).

Maven (Kafka connector)
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
Enable the default bridge bean with:
quarkus.flow.messaging.defaults-enabled=true

This creates a CDI bean that consumes domain events from flow-in and publishes domain events to flow-out.

What you get

  • Default channelsflow-in (inbound) and flow-out (outbound) for structured CloudEvents JSON.

  • Connector-agnostic – works with any SmallRye connector you add (Kafka, JMS, AMQP, …).

  • Optional lifecycle stream – publish engine lifecycle CloudEvents on flow-lifecycle-out.

  • BYO integration – you can override with your own EventConsumer/EventPublisher beans if needed.

Quickstart (Kafka)

1) Add the connector

(bridge auto-activates)

2) Turn on the default bridge

quarkus.flow.messaging.defaults-enabled=true

3) Map default channels to Kafka topics

# Inbound (structured CloudEvents JSON)
mp.messaging.incoming.flow-in.connector=smallrye-kafka
mp.messaging.incoming.flow-in.topic=flow-in
mp.messaging.incoming.flow-in.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
mp.messaging.incoming.flow-in.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# Outbound (structured CloudEvents JSON)
mp.messaging.outgoing.flow-out.connector=smallrye-kafka
mp.messaging.outgoing.flow-out.topic=flow-out
mp.messaging.outgoing.flow-out.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
mp.messaging.outgoing.flow-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer

# Optional if Dev Services is off:
# kafka.bootstrap.servers=localhost:9092

4) A tiny event-driven workflow

package org.acme;

import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;

import jakarta.enterprise.context.ApplicationScoped;

import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;

@ApplicationScoped
public class HelloMessagingFlow extends Flow {

    @Override
    public Workflow descriptor() {
        return FuncWorkflowBuilder.workflow("hello-messaging")
                .tasks(
                        // Wait for one request event
                        listen("waitHello", to().one(event("org.acme.hello.request")))
                                // listen() returns a collection; pick the first
                                .outputAs((java.util.Collection<Object> c) -> c.iterator().next()),

                        // Build a response with jq
                        set("{ message: \"Hello \" + .name }"),

                        // Emit the response event
                        emitJson("org.acme.hello.response", java.util.Map.class))
                .build();
    }
}

Lifecycle events (optional)

Enable a separate lifecycle stream (engine signals: task/workflow started, completed, suspended, faulted, …):

quarkus.flow.messaging.lifecycle-enabled=true

mp.messaging.outgoing.flow-lifecycle-out.connector=smallrye-kafka
mp.messaging.outgoing.flow-lifecycle-out.topic=flow-lifecycle-out
mp.messaging.outgoing.flow-lifecycle-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.flow-lifecycle-out.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

Common types: io.serverlessworkflow.task.started.v1, …​task.completed.v1, …​workflow.started.v1, …​workflow.completed.v1, etc.

Bring your own messaging (advanced)

Provide your own beans to take full control:

  • One io.serverlessworkflow.impl.events.EventConsumer (inbound)

  • Zero or more io.serverlessworkflow.impl.events.EventPublisher (outbound)

If none are provided, the defaults above apply when defaults-enabled=true.

Notes on serialization

  • We use structured CloudEvents JSON end-to-end (byte[] payload on the wire).

  • Event data is typically application/json. Jackson handles (de)serialization of payloads referenced by your workflow tasks.

See also