Quarkus Flow & CNCF Serverless Workflow Java Cookbook

This cookbook provides a comprehensive collection of practical, copy-and-paste examples for building modern workflows using the Serverless Workflow Java SDK Fluent API.

All workflows in Quarkus Flow are defined as CDI beans by extending the Flow class and overriding the descriptor() method.

We recommend using the FuncWorkflowBuilder with FuncDSL.

Essential Setup

Every workflow should follow this structure.

package org.acme;

import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.spec.FuncWorkflowBuilder;
import jakarta.enterprise.context.ApplicationScoped;

// Recommended Static Imports
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;

@ApplicationScoped
public class MyWorkflow extends Flow {
    @Override
    public Workflow descriptor() {
        return FuncWorkflowBuilder.workflow("my-workflow-id")
            .tasks(
                // tasks go here
            )
            .build();
    }
}

1. Scheduled Executions (CRON)

Trigger workflows on a schedule by defining a schedule within the builder.

Check how to schedule workflow executions for more details.

package org.acme;

// Static imports recommended for brevity:
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;

import java.util.Date;
import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;

import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;

@ApplicationScoped
public class CronWorkflow extends Flow {
    @Override
    public Workflow descriptor() {
        return FuncWorkflowBuilder.workflow("cron-workflow")
                .schedule(cron("* * * * * ?")) // Every second
                .tasks(
                        set(Map.of("message", "Executed Cron Workflow at: " + new Date())))
                .build();
    }
}

2. External API Integrations (HTTP REST)

Use the call task with http to interact with REST endpoints, managing headers and query parameters dynamically.

package org.acme;

// Static imports recommended for brevity:
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.call;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;

@ApplicationScoped
public class HttpWorkflow extends Flow {

    @ConfigProperty(name = "wiremock.url")
    String wiremockUrl;

    @Override
    public Workflow descriptor() {
        return FuncWorkflowBuilder.workflow("http-with-query-headers", "org.acme", "1.0")
                .tasks(
                        call("searchStarWarsCharacters",
                                http()
                                        .GET()
                                        // search value is taken from workflow input, jq expression is used
                                        .query("search", "${ .searchQuery }")
                                        .endpoint(wiremockUrl + "/api/people")
                                        // Accept value is taken from workflow input, jq expression is used
                                        .header("Accept", "${ .acceptHeaderValue }")
                                        // export the results of the GET request as taskOutput
                                        .exportAsTaskOutput()))
                .build();
    }
}

3. OpenAPI Services

Invoke OpenAPI endpoints by referencing the specification and the operationId.

package org.acme;

import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;

// Static imports recommended for brevity:
import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;

@ApplicationScoped
public class OpenApiWorkflow extends Flow {

    @ConfigProperty(name = "wiremock.url")
    String wiremockUrl;

    @Override
    public Workflow descriptor() {
        return FuncWorkflowBuilder.workflow("openapi-call-workflow")
                .tasks(
                        openapi()
                                .document(wiremockUrl + "/v2/swagger.json")
                                .operation("findPetsByStatus")
                                .parameters(Map.of("status", "available")))
                .build();
    }
}

4. Authentication (OAuth2)

Define authentication mechanisms for secure outbound communication.

package org.acme;

import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.oauth2;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.OAuth2AuthenticationData;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;

@ApplicationScoped
public class HttpOauth2Workflow extends Flow {

    @Inject
    @ConfigProperty(name = "wiremock.secure.url")
    String wiremockSecureUrl;

    @Override
    public Workflow descriptor() {
        return FuncWorkflowBuilder.workflow("oauth2-authentication-workflow")
                .tasks(
                        call("getPets",
                                http()
                                        .GET()
                                        .query("petId", "${ .petId }")
                                        .uri(wiremockSecureUrl + "/v2/pet",
                                                oauth2(wiremockSecureUrl + "/realms/fake-authority",
                                                        OAuth2AuthenticationData.OAuth2AuthenticationDataGrant.CLIENT_CREDENTIALS,
                                                        "workflow-runtime-id",
                                                        "workflow-runtime-secret"))

                        ))
                .build();
    }
}

5. Event Consumption and Production

To integrate the workflow with CloudEvents. Use listen to pause for events and emit to publish them.

Listening for an Event:

package org.acme;

import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;

@ApplicationScoped
public class ListenWorkflow extends Flow {

    @ConfigProperty(name = "wiremock.url")
    String wiremockUrl;

    @Override
    public Workflow descriptor() {
        return FuncWorkflowBuilder.workflow("listen-to-one-workflow")
                .tasks(
                        // The workflow will pause here until the engine receives this specific CloudEvent
                        listen("waitForStartup", toOne("race.started.v1")),

                        // Once awakened, it executes this HTTP call
                        call("startup", post("", wiremockUrl + "/start")))
                .build();
    }
}

Emitting an Event:

package org.acme;

import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.emitJson;

import jakarta.enterprise.context.ApplicationScoped;

import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;

@ApplicationScoped
public class EmitWorkflow extends Flow {
    @Override
    public Workflow descriptor() {
        return FuncWorkflowBuilder.workflow("emit-event-workflow", "org.acme", "1.0")
                .tasks(
                        emitJson("orderPlaced", "com.petstore.order.placed.v1", Message.class))
                .build();
    }
}

6. Conditional Logic

Branch execution based on data state using switchCase(…​), switchWhen(…​) and switchWhenOrElse(…​)`.

package org.acme;

import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.FlowDirectiveEnum;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;

@ApplicationScoped
public class ConditionalWorkflow extends Flow {

    @ConfigProperty(name = "wiremock.url")
    String wiremockUrl;

    @Override
    public Workflow descriptor() {
        return FuncWorkflowBuilder.workflow("conditional-routing")
                .tasks(
                        // 1. Evaluate the condition and branch
                        switchWhenOrElse((ScorePayload p) -> p.score() >= 80, "approveTask", "rejectTask"),

                        // 2. Branch A: Score is 80 or higher
                        post("approveTask", "", wiremockUrl + "/approve")
                                .then(FlowDirectiveEnum.END), // equals to break; in switch cases
                        // 3. Branch B: Score is below 80
                        post("rejectTask", "", wiremockUrl + "/reject"))
                .build();
    }
}

7. Iteration

Execute logic over collections of data using the forEach(…​).

package org.acme;

import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;

@ApplicationScoped
public class ForEachWorkflow extends Flow {

    @ConfigProperty(name = "wiremock.url")
    String wiremockUrl;

    @Override
    public Workflow descriptor() {
        return FuncWorkflowBuilder.workflow("foreach-workflow")
                .tasks(
                        forEach(OrdersPayload::orders,
                                tasks(
                                        post("$item.id",
                                                wiremockUrl + "/process-order")
                                                .exportAsTaskOutput())))
                .build();
    }
}

8. Invoking Subflows

Execute another workflow using subflow(…​)

Ensure the name, namespace and version parameters are matching the referenced workflows.

package org.acme;

import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;

// Static imports recommended for brevity:
import jakarta.enterprise.context.ApplicationScoped;

import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;

@ApplicationScoped
public class ParentWorkflow extends Flow {
    @Override
    public Workflow descriptor() {
        return FuncWorkflowBuilder.workflow("parent-workflow-with-children", "org.acme", "1.0")
                .tasks(
                        // Using workflow(...) shortcut to reference existing workflow
                        subflow("executeHttpWorkflow",
                                workflow("org.acme", "http-with-query-headers", "1.0")),
                        // Using Consumer<WorkflowTaskBuilder> to reference existing workflow
                        subflow("emitEventSubflow",
                                configurer -> configurer.workflow()
                                        .withName("emit-event-workflow")
                                        .withNamespace("org.acme")
                                        .withVersion("1.0")))
                .build();
    }
}

9. Parallel Execution (fork)

Execute independent branches simultaneously to reduce total execution time.

Using fork(…​)

package org.acme;

import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;

@ApplicationScoped
public class ParallelWorkflow extends Flow {

    @ConfigProperty(name = "wiremock.url")
    String wiremockUrl;

    @Override
    public Workflow descriptor() {
        return FuncWorkflowBuilder.workflow("parallel-workflow-using-branches")
                .tasks(
                        fork("checkInventoryAndCredit",
                                http("checkInventory")
                                        .method("POST")
                                        .body("")
                                        .endpoint(wiremockUrl + "/inventory-check"),
                                http("checkCredit")
                                        .method("POST")
                                        .body("")
                                        .endpoint(wiremockUrl + "/credit-check")))
                .build();
    }
}

10. Context-aware execution

Execute workflow with access to engine metadata using withContext(…​)

package org.acme;

import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.withContext;

import jakarta.enterprise.context.ApplicationScoped;

import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;
import io.serverlessworkflow.impl.WorkflowContextData;

@ApplicationScoped
public class ContextWorkflow extends Flow {
    @Override
    public Workflow descriptor() {
        return FuncWorkflowBuilder.workflow("context-aware")
                .tasks(
                        withContext((String input, WorkflowContextData contextData) -> {
                            System.out.println("Instance ID: " + contextData.instanceData().id());
                            return "Processed " + input;
                        }, String.class))
                .build();
    }
}

Execute workflow with access to task metadata using withFilter(…​)

package org.acme;

import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;

import jakarta.enterprise.context.ApplicationScoped;

import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;
import io.serverlessworkflow.impl.TaskContextData;
import io.serverlessworkflow.impl.WorkflowContextData;

@ApplicationScoped
public class TaskContextWorkflow extends Flow {
    @Override
    public Workflow descriptor() {
        return FuncWorkflowBuilder.workflow("task-context-workflow")
                .tasks(
                        withFilter("taskAudit",
                                (ExampleEvent payload,
                                        WorkflowContextData workflowContextData,
                                        TaskContextData taskContextData) -> {
                                    // Access the task context
                                    System.out.println("Local Task Name: " + taskContextData.taskName());
                                    System.out.println("Processing Message: " + payload.eventName());

                                    return "Audited [" + payload.eventName() + "] via task: " + taskContextData.taskName();
                                }, ExampleEvent.class))
                .build();
    }
}

See also