Idempotency and Correlation
Distributed systems are full of duplicate messages, retries, and network hiccups. When a workflow calls an external service, emits an event, or listens for a callback, you must decide how to handle the case where the same message arrives more than once.
This page explains what Quarkus Flow guarantees for idempotency and correlation, what it leaves to the application layer, and provides practical patterns to build resilient retry-safe workflows.
1. What Quarkus Flow Guarantees
Quarkus Flow ensures that every workflow instance carries a globally unique identity that is propagated to external systems, giving them the tools they need to deduplicate on their own.
1.1 Unique Instance Identity
Every time a workflow starts, Quarkus Flow assigns it a unique instanceId (a ULID). This ID is the foundation of all idempotency guarantees:
// Every workflow instance has a globally unique, stable identifier
String instanceId = ctx.instanceData().id(); // e.g. "01K9GDCXJVN89V0N4CWVG40R7C"
1.2 HTTP Correlation Headers
By default, every HTTP or OpenAPI call made by a workflow includes correlation headers that downstream services can use as idempotency keys — if a workflow retries the same HTTP call due to a timeout, the downstream service receives the exact same headers and can recognize it as a duplicate.
For the full list of headers and configuration options, see Distributed Tracing via HTTP Headers.
1.3 CloudEvent Correlation Attributes
When a workflow emits a CloudEvent via the messaging bridge, the engine attaches correlation metadata (such as flowinstanceid and flowtaskid) as CloudEvent extension context attributes. Downstream consumers can read these attributes to correlate events back to the exact workflow and task that produced them.
For the full list of attributes, configuration options, and examples, see CloudEvent Correlation Headers.
1.4 State Persistence and Recovery
When state persistence is enabled, the engine saves the workflow’s execution state to a durable store after every completed task. If the JVM crashes, the engine can safely resume from the last completed checkpoint — never re-executing completed tasks.
This provides at-least-once task execution within the workflow: every task runs at least once, and the persisted checkpoint ensures forward progress without repeating already-committed work.
2. What Flow Does NOT Guarantee
Idempotency in a distributed system has hard limits. Quarkus Flow is transparent about what it cannot guarantee, so you can design your system accordingly.
2.1 No Server-Side Deduplication
The engine does not maintain a deduplication cache. If the same start event arrives twice (e.g., Kafka delivers the same message twice), Quarkus Flow will start two separate workflow instances, each with a different instanceId.
|
This is by design. The Serverless Workflow specification treats every incoming event as a unique trigger. Duplicate detection at the workflow-triggering layer is the responsibility of the messaging infrastructure or of an application-level deduplication layer. |
2.2 No Exactly-Once Execution
The engine guarantees at-least-once semantics for task execution, not exactly-once. This means:
-
An HTTP call might be retried (see fault tolerance), potentially succeeding more than once from the downstream service’s perspective.
-
A task that completes but whose state fails to persist before a crash will be re-executed on recovery.
To achieve exactly-once semantics, you must implement idempotency at the data receiver — your downstream APIs and databases.
2.3 No Cross-Instance Deduplication
Two distinct workflow instances (even of the same workflow definition) are completely independent. The engine does not correlate or deduplicate events across instances. If you need singleton semantics (only one instance of a workflow running at a time), you must implement it yourself, for example with a database lock or lease.
3. Practical Patterns
The patterns below show how to use Quarkus Flow’s correlation metadata to build robust idempotent systems.
3.1 HTTP Idempotency with Idempotency Key Header
When your downstream API needs to guard against duplicate processing, use a business key (such as an orderId) as the idempotency key — not the workflow instance ID. Different instances of the same workflow may process the same business entity, and using the instance ID as the idempotency key would allow duplicate charges or operations.
The workflow can pass the business key as an HTTP header, and the downstream service uses it to deduplicate:
Downstream service (pseudo-code):
@POST
@Path("/payments")
public Response processPayment(@HeaderParam("X-Order-Id") String orderId,
PaymentRequest request) {
// Check if we've already processed this order
if (paymentStore.exists(orderId)) {
// Return the previously stored result — no double-charge
return Response.ok(paymentStore.get(orderId)).build();
}
// Process the payment (this runs at most once per order)
PaymentResult result = paymentGateway.charge(request);
// Store the result keyed by the business key
paymentStore.save(orderId, result);
return Response.ok(result).build();
}
Workflow definition:
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;
public class PaymentWorkflow extends Flow {
public Workflow descriptor() {
return workflow("paymentWorkflow")
.tasks(
call("processPayment", http("/payments")
.method("POST")
.header("X-Order-Id", "${.order.id}")
.body(".paymentRequest"))
)
.build();
}
}
|
The automatic |
3.2 Idempotent PUT Requests
The HTTP PUT method is naturally idempotent — calling the same PUT request multiple times has the same effect as calling it once. When combined with Quarkus Flow’s correlation headers, PUT endpoints (when implemented correctly) become safe to retry without additional infrastructure.
Workflow:
package org.acme;
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;
import jakarta.enterprise.context.ApplicationScoped;
import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;
@ApplicationScoped
public class UpdateInventoryWorkflow extends Flow {
@Override
public Workflow descriptor() {
return FuncWorkflowBuilder.workflow("updateInventory")
.tasks(
call("updateStock", http("/inventory/{sku}")
.method("PUT")
.body(".stockUpdate")))
.build();
}
}
The downstream endpoint simply implements PUT semantics — replacing the resource at the given URL. Retries from the workflow’s fault tolerance mechanism are safe because PUT is idempotent by definition.
3.3 Event Deduplication Table
When consuming events from a message broker (e.g., Kafka), the broker may deliver the same event multiple times. Use a deduplication table to ensure each event triggers at most one workflow instance.
Deduplication table (SQL):
CREATE TABLE event_deduplication (
event_id VARCHAR(255) PRIMARY KEY,
workflow_instance_id VARCHAR(255),
processed_at TIMESTAMP NOT NULL
);
Event consumer with deduplication check:
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@ApplicationScoped
public class DeduplicatingEventConsumer {
@Inject
DataSource dataSource;
@Transactional
public boolean tryProcess(String eventId) {
try (Connection conn = dataSource.getConnection()) {
// Atomically insert — if the key exists, this is a duplicate
PreparedStatement ps = conn.prepareStatement(
"INSERT INTO event_deduplication (event_id, processed_at) VALUES (?, NOW())");
ps.setString(1, eventId);
try {
ps.executeUpdate();
return true; // First time seeing this event
} catch (java.sql.SQLIntegrityConstraintViolationException e) {
return false; // Duplicate event, skip
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Usage before starting a workflow:
// In your event listener or REST endpoint:
if (deduplicator.tryProcess(event.id())) {
workflowApplication.start(definition, event.payload());
} else {
// Log and discard the duplicate
}
|
This pattern relies on a |
3.4 Outbox Pattern Using Task Listeners
The outbox pattern ensures that workflow-triggered side effects (e.g., sending an email, updating a ledger, publishing a message) are guaranteed to happen at least once without risking inconsistent state.
The idea: instead of performing the side effect directly in a task, write the intent to an "outbox" table. A separate process reads the outbox table and delivers the side effects reliably.
Workflow with outbox writer:
package org.acme;
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;
import java.util.Map;
import jakarta.enterprise.context.ApplicationScoped;
import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;
@ApplicationScoped
public class OrderWorkflow extends Flow {
@Override
public Workflow descriptor() {
return FuncWorkflowBuilder.workflow("orderWorkflow")
.tasks(
call("queueNotification", http("/outbox/notifications")
.method("POST")
.body(Map.of("to", "${.customer.email}", "subject", "Order confirmed", "orderId", "${.id}"))))
.build();
}
}
A separate outbox relay reads and delivers:
import jakarta.enterprise.context.ApplicationScoped;
import io.quarkus.scheduler.Scheduled;
@ApplicationScoped
public class OutboxRelay {
@Scheduled(every = "5s")
void deliverOutboxMessages() {
// 1. Read unprocessed messages from the outbox table
// 2. Attempt delivery (email, HTTP call, Kafka, etc.)
// 3. Mark as processed on success
// 4. Implement retry with backoff for failures
}
}
For stronger guarantees, combine the outbox pattern with custom workflow listeners to capture task completions and write to the outbox as part of the engine’s lifecycle:
package org.acme;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
@ApplicationScoped
public class OutboxExecutionListener implements WorkflowExecutionListener {
@Inject
OutboxWriter outboxWriter;
@Override
public void onTaskCompleted(TaskCompletedEvent ev) {
// After certain tasks complete, write a side-effect intent to the outbox
String taskName = ev.taskContext().taskName();
if ("queueNotification".equals(taskName)) {
outboxWriter.write(
ev.workflowContext().instanceId(),
"send-email",
ev.workflowContext().currentData()
);
}
}
}
This approach ensures the outbox entry is written after the task completes successfully, as part of the workflow lifecycle, eliminating the window where a task succeeds but the side effect is lost.
3.5 Correlating Callbacks to a Waiting Instance
This is the pattern you want when a workflow pauses on a listen(…) task and an external system must later resume that exact workflow instance.
Important distinction: this is not the Serverless Workflow spec’s event-to-event correlation model. Here, the goal is to route an incoming callback to a specific waiting workflow instance.
Quarkus Flow supports multiple strategies for correlating callbacks to a waiting instance:
Correlation via CloudEvent Extension
The most common approach: the workflow instance ID is propagated as a CloudEvent extension attribute.
The practical pattern is:
-
The workflow emits a request event.
-
Quarkus Flow adds
flowinstanceidto that outbound CloudEvent. -
The external system stores that instance ID alongside its business record.
-
When the callback arrives, the external system includes the same instance ID on the event it sends back to
flow-in. -
The engine uses that metadata to wake the matching paused instance.
This lets you build resume/callback flows without inventing your own instance lookup table in the workflow itself.
Workflow side:
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;
public class NewsletterWorkflow extends Flow {
public Workflow descriptor() {
return workflow("intelligent-newsletter")
.tasks(
emitJson("draftReady", "org.acme.email.review.required", NewsletterDraft.class),
listen("waitHumanReview",
toOne(consumed("org.acme.newsletter.review.done")
.extensionByInstanceId("flowinstanceid"))))
.build();
}
}
Callback side:
@PUT
@Path("/newsletter")
public Response sendReview(HumanReview review, @HeaderParam("X-Flow-Instance-Id") String instanceId)
throws JsonProcessingException {
byte[] body = objectMapper.writeValueAsBytes(review);
CloudEvent ce = CloudEventBuilder.v1().withId(UUID.randomUUID().toString())
.withExtension("flowinstanceid", instanceId)
.withSource(URI.create("api:/newsletter")).withType("org.acme.newsletter.review.done")
.withDataContentType("application/json").withData(body).build();
flowIn.send(CE_JSON.serialize(ce));
return Response.accepted().build();
}
|
The callback request should carry the original If your callback payload already contains a business correlation key, keep it too. The workflow instance ID is the routing key for the paused workflow, while your business key is the domain identifier for your own records. |
Correlation via CloudEvent Data Fields
When your external system already uses a business identifier in the event payload (e.g., an orderId, customerId, or ticketId), you can correlate callbacks directly from the CloudEvent data — no need to rely on extension attributes.
The consumed(…) DSL provides several data-based filtering methods:
-
dataByInstanceId("fieldName")— matches a field in the CloudEvent data against the workflow instance ID. -
dataAs(MyEvent.class, predicate)— filters using a typed Java predicate over the deserialized CloudEvent data. Use this to match on any business key in your event model. -
dataAsMap(predicate)— filters using aMap<String, Object>predicate, useful when you don’t have a strongly-typed model. -
dataFields("field1", "field2")— matches specific fields from the CloudEvent data against the same fields in the workflow context, enabling content-based correlation.
Example: correlating by a business key in the event data:
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;
public class OrderApprovalWorkflow extends Flow {
public Workflow descriptor() {
return workflow("order-approval")
.tasks(
emitJson("requestApproval", "org.acme.order.approval.requested", Order.class),
listen("waitApproval",
toOne(consumed("org.acme.order.approval.done")
.dataAs(Order.class, (order, wfCtx, taskCtx) -> {
// Match the orderId in the callback to the orderId in our workflow context
Order current = (Order) wfCtx.currentData();
return order.orderId().equals(current.orderId());
})))
)
.build();
}
}
This pattern is especially useful when your callback events come from systems that don’t know about flowinstanceid — they only carry the business key that both sides agree on.
The listen("taskName", toOne("event.type")) DSL line still defines the event wait itself. The instance correlation comes from the callback contract plus the instance-aware event filter.
See also
-
Enable distributed tracing — automatic HTTP header propagation for correlation.
-
CloudEvent Correlation Headers — correlation metadata on emitted events.
-
Fault tolerance and resilience — retries and circuit breakers for HTTP tasks.
-
Configure state persistence — durable storage for workflow recovery.
-
Listen to workflow events — lifecycle hooks for implementing the outbox pattern.
-
Durable Workflows in Kubernetes — lease-based worker identity for reliable recovery.