How to transform data between workflow tasks
This guide shows you how to control data flow between tasks in a workflow using inputFrom, exportAs, and outputAs transformations.
1. What you’ll build
You’ll create a conference paper submission workflow that:
-
Receives a paper proposal submission
-
Validates and scores the proposal
-
Enriches the data with submission metadata
-
Sends a notification with the review result
This demonstrates the three key data transformation methods:
-
inputFrom– to shape what each task receives as input -
exportAs– to control what gets persisted in the workflow data -
outputAs– to pass enriched data between tasks without modifying workflow state
|
For conceptual background on data flow, see Data Flow and Transformations. For a quick syntax reference, see Quick Reference. |
2. Prerequisites
-
A Quarkus Flow project (see Getting Started)
-
Basic understanding of Java records and functional programming
3. Step 1: Define your data types
Create the records that represent data at different stages of the workflow:
/**
* External DTO received from the API
*/
public record ProposalSubmission(String title, String proposal, String author) {
}
/**
* Internal domain model used for processing
*/
public record Proposal(String title, String abstractText, String author) {
}
/**
* Scoring result persisted in workflow data
*/
public record ProposalScore(long score, boolean accepted) {
}
/**
* Final notification payload enriched with data from multiple sources
*/
public record NotificationPayload(String title, String author, long score, boolean accepted) {
}
4. Step 2: Create the workflow with inputFrom
Start by creating a workflow that transforms the input submission into your domain model:
@ApplicationScoped (1)
public class Call4PapersFlow extends Flow { (2)
@ConfigProperty(name = "notification.service.base-url")
String baseUrl;
@Override
public Workflow descriptor() {
return FuncWorkflowBuilder.workflow("call4papers")
.tasks(
// Step 1: Validate proposal with inputFrom transformation
function("validateProposal", (Proposal input) -> {
String proposalTitle = input.title();
if (proposalTitle == null || proposalTitle.isBlank()) {
throw new IllegalArgumentException("Title is required");
}
return input;
}, Proposal.class)
.inputFrom((ProposalSubmission submission) -> new Proposal(
submission.title(),
submission.proposal(), // Maps to abstractText
submission.author()), ProposalSubmission.class),
| 1 | Add CDI annotation to make the flow injectable |
| 2 | Extend Flow to define a workflow descriptor |
In the validateProposal task, we expect a Proposal as input, but the workflow receives a ProposalSubmission. Use inputFrom to transform the submission into the proposal domain model.
|
|
5. Step 3: Add scoring with outputAs
Add a task that scores the proposal and uses outputAs to pass transformed data to the next step:
// Step 1: Validate proposal with inputFrom transformation
function("validateProposal", (Proposal input) -> {
String proposalTitle = input.title();
if (proposalTitle == null || proposalTitle.isBlank()) {
throw new IllegalArgumentException("Title is required");
}
return input;
}, Proposal.class)
.inputFrom((ProposalSubmission submission) -> new Proposal(
submission.title(),
submission.proposal(), // Maps to abstractText
submission.author()), ProposalSubmission.class),
// Step 2: Score proposal with exportAs transformation
function("scoreProposal", (Proposal input) -> { (1)
Integer score = calculateScore(input.abstractText());
System.out.println("Score calculated having the result as: " + score);
return score;
}, Proposal.class)
.outputAs((Integer score) -> new ProposalScore(score, score >= 7)), (2)
| 1 | Task receives the output from the previous task (a Proposal) |
| 2 | Use outputAs to transform the task result (Integer score) into a ProposalScore and pass it to the next step |
|
|
6. Step 4: Enrich data and commit with exportAs
Add a task that prepares the notification by enriching the score with data from the original workflow input, then commits to workflow context using exportAs:
// Step 3: Prepare notification with exportAs using workflow context
function("prepareNotification", Function.identity(), ProposalScore.class) (1)
.exportAs((object, workflowContext, taskContextData) -> { (2)
ProposalScore taskOutput = output(taskContextData, ProposalScore.class); (3)
ProposalSubmission submission = FuncDSL.input(workflowContext, (4)
ProposalSubmission.class);
return new NotificationPayload( (5)
submission.title(),
submission.author(),
taskOutput.score(),
taskOutput.accepted());
}),
// Step 4: Send notification via HTTP
http("sendNotification")
.POST()
.body("${ $context }") (6)
.header("Content-Type", "application/json")
.uri(URI.create(baseUrl + "/notifications")))
| 1 | Just pass the ProposalScore through without transformation (identity function) |
| 2 | Use exportAs with JavaFilterFunction to commit enriched data to workflow data |
| 3 | Use FuncDSL.output() to get type-safe access to the current task’s output |
| 4 | Use FuncDSL.input() to get type-safe access to original workflow input |
| 5 | Create enriched payload combining data from multiple sources |
| 6 | HTTP task receives the committed NotificationPayload from workflow data.
The body will be the NotificationPayload as JSON |
7. Step 5: Complete example
Here’s the complete workflow with all transformations:
package org.acme.dataflow;
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function;
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.http;
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.output;
import java.net.URI;
import java.util.function.Function;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;
import io.serverlessworkflow.fluent.func.dsl.FuncDSL;
// tag::validate-proposal[]
@ApplicationScoped (1)
public class Call4PapersFlow extends Flow { (2)
@ConfigProperty(name = "notification.service.base-url")
String baseUrl;
@Override
public Workflow descriptor() {
return FuncWorkflowBuilder.workflow("call4papers")
.tasks(
// tag::validate-and-score[]
// Step 1: Validate proposal with inputFrom transformation
function("validateProposal", (Proposal input) -> {
String proposalTitle = input.title();
if (proposalTitle == null || proposalTitle.isBlank()) {
throw new IllegalArgumentException("Title is required");
}
return input;
}, Proposal.class)
.inputFrom((ProposalSubmission submission) -> new Proposal(
submission.title(),
submission.proposal(), // Maps to abstractText
submission.author()), ProposalSubmission.class),
// end::validate-proposal[]
// Step 2: Score proposal with exportAs transformation
function("scoreProposal", (Proposal input) -> { (1)
Integer score = calculateScore(input.abstractText());
System.out.println("Score calculated having the result as: " + score);
return score;
}, Proposal.class)
.outputAs((Integer score) -> new ProposalScore(score, score >= 7)), (2)
// end::validate-and-score[]
// tag::last-tasks[]
// Step 3: Prepare notification with exportAs using workflow context
function("prepareNotification", Function.identity(), ProposalScore.class) (1)
.exportAs((object, workflowContext, taskContextData) -> { (2)
ProposalScore taskOutput = output(taskContextData, ProposalScore.class); (3)
ProposalSubmission submission = FuncDSL.input(workflowContext, (4)
ProposalSubmission.class);
return new NotificationPayload( (5)
submission.title(),
submission.author(),
taskOutput.score(),
taskOutput.accepted());
}),
// Step 4: Send notification via HTTP
http("sendNotification")
.POST()
.body("${ $context }") (6)
.header("Content-Type", "application/json")
.uri(URI.create(baseUrl + "/notifications")))
// end::last-tasks[]
.build();
}
/**
* Calculate a score for the proposal based on its abstract.
* In a real implementation, this might use NLP, keyword analysis, etc.
*/
private Integer calculateScore(String abstractText) {
// Simple scoring: longer abstracts get higher scores
int length = abstractText.length();
if (length > 500)
return 9;
if (length > 300)
return 7;
if (length > 150)
return 5;
return 3;
}
// tag::data-types[]
/**
* External DTO received from the API
*/
public record ProposalSubmission(String title, String proposal, String author) {
}
/**
* Internal domain model used for processing
*/
public record Proposal(String title, String abstractText, String author) {
}
/**
* Scoring result persisted in workflow data
*/
public record ProposalScore(long score, boolean accepted) {
}
/**
* Final notification payload enriched with data from multiple sources
*/
public record NotificationPayload(String title, String author, long score, boolean accepted) {
}
// end::data-types[]
}
8. Testing the workflow
Start a workflow instance with a submission:
@Inject
Call4PapersFlow flow;
public void submitProposal() {
var submission = new ProposalSubmission(
"Reactive Workflows with Quarkus",
"This paper explores reactive workflow patterns...",
"Jane Developer"
);
flow.startInstance(submission).await().indefinitely();
}
9. Key takeaways
-
Use
inputFromto give tasks a focused, typed view of the data they need -
Use
exportAsto control what gets persisted in the workflow data document -
Use
outputAsto pass data to the next step without committing to workflow data -
Use
FuncDSL.input(workflowContext, Type.class)for type-safe access to the original workflow input -
Use
FuncDSL.output(taskContext, Type.class)for type-safe access to the current task’s output -
Access workflow and task context in transformations to combine data from multiple sources
10. Next steps
-
Learn about context-aware transformations with
JavaFilterFunctionin Data Flow and Transformations -
Explore JQ expressions as an alternative to Java functions
-
See Quick Reference for all transformation method signatures
11. Related guides
-
Data Flow and Transformations – Conceptual overview
-
Java DSL Cheatsheet – Quick syntax reference
-
Testing and Debugging – How to test workflows