Data flow and transformations
Workflows in Quarkus Flow (and in the CNCF Workflow Specification) operate on a single logical data document – a JSON-like tree.
Each task can:
-
Select what part of that document it sees as input.
-
Shape what it exports to the next step.
-
Commit what it writes back into workflow data.
This page explains how that maps to the CNCF Workflow data flow model and how to use the Java DSL helpers:
-
inputFrom(…) -
exportAs(…) -
outputAs(…)
1. Mapping CNCF Workflow data flow → Java DSL
The CNCF Workflow data flow model introduces the ideas of:
-
Input – what a state or task receives.
-
Export – what a state exposes to downstream consumers / events.
-
Output – what is written back to the workflow data.
In the Java Func DSL, these concepts map to:
| Spec concept | YAML field | Java DSL helper | Effect |
|---|---|---|---|
Input |
|
|
Choose what the task sees as input. |
Export |
|
|
Shape what’s forwarded to the next step / event without committing to global data. |
Output |
|
|
Shape what’s written back into workflow data ( |
All three helpers are optional. If you don’t set them, the engine uses sensible defaults (task input = workflow data, output merged back as-is).
2. Three layers of transformations
For any task you typically decide three things:
-
What input it should see (
inputFrom). -
What part of its result should be exported (
exportAs). -
What should be committed to workflow data (
outputAs).
Each of these can be expressed as:
-
a jq expression (
String), or -
a Java function, including the context-aware variants:
-
JavaContextFunction<T,R>– sees workflow context. -
JavaFilterFunction<T,R>– sees workflow + task context.
-
The overloads all eventually populate the spec fields described earlier.
2.1 inputFrom – shape the task input
Without transformations, a task sees the whole workflow data as its input.
Use inputFrom to give it a smaller, focused view:
// Only pass the "seed" field into this task:
set("normalizeInput")
.inputFrom("$.seed");
With Java code:
// T is the current workflow data / task input
inputFrom((InvestmentRequest in) ->
Map.of("ticker", in.ticker().toUpperCase()),
InvestmentRequest.class);
For context-aware filters:
import io.serverlessworkflow.api.types.func.JavaFilterFunction;
import io.serverlessworkflow.impl.WorkflowContextData;
import io.serverlessworkflow.impl.TaskContextData;
// T = InvestmentRequest, R = Map<String,Object>
inputFrom((InvestmentRequest in,
WorkflowContextData wf,
TaskContextData task) -> Map.of(
"ticker", in.ticker(),
"objective", in.objective(),
"taskPos", task.position().jsonPointer() // e.g. /do/0/task
), InvestmentRequest.class);
Here you can:
-
Inspect the workflow context (
wf) – ids, metadata, current data. -
Inspect the task context (
task) – position, name, etc.
This is handy for logging, debugging, or adding metadata without polluting your business types.
2.2 exportAs – pass data to the next step
exportAs controls what the task exports for downstream consumers (the next
task, events, etc.) without immediately updating global workflow data.
Think of it as a pipe between steps:
// Draft -> export only the text, not the whole object
agent("draftNewsletter", drafter::draft, Draft.class)
.exportAs(draft -> Map.of(
"draftText", draft.text(),
"quality", draft.quality()
), Draft.class);
// Critic sees a narrow view shaped by exportAs(...)
agent("criticNewsletter", critic::critique, Review.class);
Unlike outputAs, this doesn’t decide what ends up in the workflow data
document, only what is forwarded as the exported payload.
Use it when:
-
You want tight coupling between adjacent steps.
-
You want to feed a clean payload into an event (
emitJson(…)), while controlling what gets exposed.
|
That context is available to adjacent and later steps via |
2.3 outputAs – commit results to workflow data
outputAs controls what gets written back into the workflow data document.
In many cases you want the task result as-is; in others you want to:
-
rename fields,
-
drop internal details, or
-
construct a composite DTO.
Simple variant:
// Persist a memo under $.memo
agent("investmentAnalyst", analyst::analyse, InvestmentMemo.class)
.outputAs(memo -> Map.of("memo", memo), InvestmentMemo.class);
Context-aware variant (similar to the agentic HTTP example):
get("fetchMarketData", "http://localhost:8081/market-data/{ticker}")
.outputAs((MarketDataSnapshot snapshot,
WorkflowContextData wf,
TaskContextData task) -> {
// Original task input as seen by this step
var input = task.input().asMap().orElseThrow();
// Raw HTTP body before any shaping
var rawBody = task.rawOutput().asText().orElseThrow();
return new InvestmentPrompt(
snapshot.ticker(),
input.get("objective").toString(),
input.get("horizon").toString(),
rawBody
);
}, MarketDataSnapshot.class);
Here:
-
snapshotis the typed task result (MarketDataSnapshot). -
wflets you inspect current workflow data / metadata. -
taskgives you:-
task.input()– what this task saw as input (afterinputFrom). -
task.rawOutput()– the raw result beforeoutputAs. -
task.position()– includingjsonPointer().
-
The outputAs return value becomes the committed output for this task in
the workflow data.
3. Which one should I use?
A practical rule of thumb:
-
Use
inputFromwhen:-
you want to keep the task insulated from changes in the global data shape,
-
or you want to pass a small, explicit input record.
-
-
Use
exportAswhen:-
you want to pipe data between adjacent steps,
-
or you are shaping a payload for events / downstream services.
-
-
Use
outputAswhen:-
you are deciding what should live in the workflow data document,
-
or you want neat, testable state at the end of the workflow (what Dev UI shows as Output).
-
In many workflows you will only need inputFrom and outputAs. exportAs is
most useful in more advanced pipelines and event-driven flows.
4. JQ vs Java functions
All three helpers have two “families” of overloads:
-
JQ expressions (string):
inputFrom("$.customer"); outputAs("$.result");Good when you already think in terms of JSON paths and you’re transforming purely structural data.
-
Java functions:
// Simple function outputAs((MyResult r) -> Map.of("value", r.value()), MyResult.class); // Context-aware with workflow + task outputAs((MyResult r, WorkflowContextData wf, TaskContextData t) -> { ... }, MyResult.class);Good when: you already have domain types (records, POJOs), you want to reuse Java logic or validation, or you need context** (workflow instance id, task position, etc.).
You can freely mix JQ and Java across tasks in the same workflow.
5. Quick reference
-
inputFrom(String jq)– slice workflow data using JQ. -
inputFrom(Function<T,R>, Class<T>)– map input with plain Java. -
inputFrom(JavaContextFunction<T,R>, Class<T>)– Java + workflow context. -
inputFrom(JavaFilterFunction<T,R>, Class<T>)– Java + workflow + task context. -
exportAs(Function/JavaContextFunction/JavaFilterFunction, Class<T>)– shape the exported result for the next step / event (no direct commit to workflow data). -
outputAs(String jq)– project task result into workflow data via JQ. -
outputAs(Function/JavaContextFunction/JavaFilterFunction, Class<T>)– shape what is written back into workflow data.
For a compact operator list and other DSL shortcuts, see Java DSL cheatsheet.