Quarkus Temporal
A Quarkus extension that lets you utilize Temporal, orchestrating both mission-critical and mainstream workloads.
Installation
If you want to use this extension, you need to add the io.quarkiverse.temporal:quarkus-temporal
extension first to your build file.
For instance, with Maven, add the following dependency to your POM file:
<dependency>
<groupId>io.quarkiverse.temporal</groupId>
<artifactId>quarkus-temporal</artifactId>
<version>0.0.9</version>
</dependency>
Getting Started
Activities and workflows are automatically detected. Simply implement an interface annotated with @ActivityInterface
or @WorkflowInterface
.
Create a Workflow
@WorkflowInterface
public interface SendEmailWorkflow {
@WorkflowMethod
public void run(WorkflowData data);
@QueryMethod
public EmailDetails details();
}
The following workflow definition will be automatically bound to the default worker:
public class SendEmailWorkflowImpl implements SendEmailWorkflow {
@Override
public void run(WorkflowData data) {
}
}
It is possible to associate the workflow with one or more named workers instead by annotating it with @TemporalWorkflow:
@TemporalWorkflow(workers = "named-worker")
public class SendEmailWorkflowImpl implements SendEmailWorkflow {
@Override
public void run(WorkflowData data) {
}
}
If you don’t have control over the workflow class, it is also possible to bind it with a worker using the workflow-classes configuration property of the worker.
quarkus.temporal.worker.namedWorker.workflow-classes[0]=io.quarkiverse.temporal.SendEmailWorkflowImpl
In this case, it will not be associated with the default worker unless you also bind it explicitely:
quarkus.temporal.worker.workflow-classes[0]=io.quarkiverse.temporal.SendEmailWorkflowImpl
Each worker can have at most one implementation of a given workflow, but a workflow can have implementations across multiple workers.
Workflows are not provided as CDI beans because dependency injection into workflow instances is strongly discouraged. Injecting dependencies into workflow instances can lead to changes that are incompatible with persisted histories, resulting in NonDeterministicException
errors. To provide external configuration to a workflow in a deterministic way, use a Local Activity that returns the configuration to the workflow. Dependency injection into activity instances is allowed, ensuring that the configuration is persisted into the history and remains consistent during replay.
If your project only contains the workflow interfaces, but does not contain the actual workflow implementations, the plugin will assume that the workflow is bound to the default worker. If this is not the case, you can hint quarkus about which workers this workflow is bound to by annotating the interface with @TemporalWorkflow directly.
@WorkflowInterface
@TemporalWorkflow(workers = "named-worker")
public interface SendEmailWorkflow {
@WorkflowMethod
public void run(WorkflowData data);
@QueryMethod
public EmailDetails details();
}
Create an Activity
@ActivityInterface
public interface SendEmailActivities {
@ActivityMethod
public String sendEmail(EmailDetails details);
}
The following activity definition will be added automatically to the default worker:
public class SendEmailActivitiesImpl implements SendEmailActivities {
@Inject (1)
Mailer mailer;
@Override
public String sendEmail(EmailDetails details) {
}
}
1 | CDI Dependency Injection is allowed in activity definition. |
It is possible to associate the activity with one or more named workers instead by annotating it with @TemporalActivity:
@TemporalActivity(workers = "named-worker")
public class SendEmailActivitiesImpl implements SendEmailActivities {
@Override
public String sendEmail(EmailDetails details) {
}
}
If you don’t have control over the activity class, it is also possible to bind it with a worker using the activity-classes configuration property of the worker:
quarkus.temporal.worker.namedWorker.activity-classes[0]=io.quarkiverse.temporal.SendEmailActivitiesImpl
In this case, it will not be associated with the default worker unless you also bind it explicitely:
quarkus.temporal.worker.activity-classes[0]=io.quarkiverse.temporal.SendEmailActivitiesImpl
Similarly, each worker can have at most one implementation of a given activity, but an activity can have implementations across multiple workers.
Using the client
public class MyService {
@Inject
WorkflowClient client; (1)
public void startSubscription(WorkflowData data) {
WorkflowOptions options = WorkflowOptions.newBuilder()
.setWorkflowId(data.getEmail())
.setTaskQueue("<default>") (2)
.build();
SendEmailWorkflow workflow = client.newWorkflowStub(SendEmailWorkflow.class, options);
WorkflowClient.start(workflow::run,data);
}
}
1 | The client can be injected as a CDI bean |
2 | The default worker queue is <default>. for named worker, use the name of the worker |
Stub Injection
It is also possible to inject a workflow stub directly using the TemporalWorkflowStub qualifier:
public class MyService {
@Inject
@TemporalWorkflowStub(workflowId = "send-email")
SendEmailWorkflow workflow;
public void startSubscription(WorkflowData data) {
workflow.run(data);
}
}
If the workflow is bound to multiple workers, the worker parameter is required:
public class MyService {
@Inject
@TemporalWorkflowStub(worker = "<default>", workflowId = "send-email")
SendEmailWorkflow workflow;
public void startSubscription(WorkflowData data) {
workflow.run(data);
}
}
The workflowId can be set at runtime by injecting a TemporalInstance:
public class MyService {
@Inject
@TemporalWorkflowStub
TemporalInstance<SimpleWorkflow> instance;
public void startSubscription(WorkflowData data) {
SendEmailWorkflow workflow = instance.workflowId("the-workflow-id");
workflow.run(data);
}
}
OpenTelemetry
To wire up Temporal to forward traces and spans to Quarkus OpenTelemetry simply add the OpenTelemetry extension to your application.
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-opentelemetry</artifactId>
</dependency>
This will enable it by default you can disable it with:
quarkus.temporal.telemetry.enabled=false
Micrometer Metrics
To wire up Temporal to forward Micrometer metrics to Quarkus OpenTelemetry simply add the Micrometer extension to your application.
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-micrometer-registry-prometheus</artifactId>
</dependency>
This will enable it by default you can disable it with:
quarkus.temporal.metrics.enabled=false
Context Propagation
You can use an MDC (Mapped Diagnostic Context) Context Propagator to propagate information from the workflow client to workflow execution, workflow to activity, workflow to child workflow, and workflow to child thread created using Async
.
To enable the MDC propagator or any custom propagators simply produce a CDI
bean implementing the ContextPropagator
interface.
package om.yourcompany;
import java.util.HashMap;
import java.util.Map;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import org.jboss.logging.Logger;
import org.slf4j.MDC;
import com.google.protobuf.ByteString;
import io.quarkus.arc.Unremovable;
import io.temporal.api.common.v1.Payload;
import io.temporal.common.context.ContextPropagator;
import io.temporal.common.converter.GlobalDataConverter;
/**
* A {@link ContextPropagator} implementation that propagates the SLF4J MDC
* (Mapped Diagnostic Context) across Temporal workflow and activity boundaries.
* This class ensures that MDC entries with keys starting with "X-" are
* propagated.
*/
@Singleton
@Unremovable
public class MDCContextPropagator implements ContextPropagator {
private static final Logger LOG = Logger.getLogger(MDCContextPropagator.class);
/**
* Gets the name of the context propagator.
*
* @return the name of the context propagator, which is the fully qualified
* class name.
*/
@Override
public String getName() {
return this.getClass().getName();
}
/**
* Retrieves the current MDC context to be propagated.
*
* @return a map containing the current MDC context, filtered to include only
* entries with keys starting with "X-".
*/
@Override
public Object getCurrentContext() {
Map<String, String> context = new HashMap<>();
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
if (mdcContext != null) {
mdcContext.entrySet().stream()
.filter(entry -> entry.getKey().startsWith("X-"))
.forEach(entry -> context.put(entry.getKey(), entry.getValue()));
}
return context;
}
/**
* Sets the current MDC context from the given context map.
*
* @param context the context map containing MDC entries to be set.
*/
@Override
public void setCurrentContext(Object context) {
if (context instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, String> contextMap = (Map<String, String>) context;
contextMap.forEach(MDC::put);
}
}
/**
* Serializes the given context map to a map of Payloads.
*
* @param context the context map containing MDC entries to be serialized.
* @return a map of Payloads representing the serialized context.
*/
@Override
public Map<String, Payload> serializeContext(Object context) {
if (!(context instanceof Map)) {
return new HashMap<>();
}
@SuppressWarnings("unchecked")
Map<String, String> contextMap = (Map<String, String>) context;
Map<String, Payload> serializedContext = new HashMap<>();
contextMap.forEach((key, value) -> GlobalDataConverter.get().toPayload(value)
.ifPresent(payload -> serializedContext.put(key, payload)));
return serializedContext;
}
/**
* Deserializes the given map of Payloads to a context map.
*
* @param context the map of Payloads to be deserialized.
* @return a context map containing the deserialized MDC entries.
*/
public Object deserializeContext(Map<String, Payload> context) {
Map<String, String> contextMap = new HashMap<>();
context.forEach((key, payload) -> {
// Handle empty {} when the data value is empty
// Adding opentracing seems to add a new value with empty data
// and the dataconverter throws an error
// This actually might be a configuration error from earlier
// but leaving in right now
//
// {_tracer-data=metadata {
// key: "encoding"
// value: "json/plain"
// }
// data: "{}"
// }
try {
String payloadValue = StringUtils.EMPTY; // default value
// Convert data to string to compare
ByteString data = payload.getData();
// Check the value to see if it "empty"
if (data != null && !data.isEmpty()) {
// Check if the value isn't {}'s
if (!StringUtils.equals("{}", data.toStringUtf8())) {
payloadValue = GlobalDataConverter.get().fromPayload(payload, String.class, String.class);
}
}
// Add the value into the map
contextMap.put(key, payloadValue);
} catch (Exception e) {
LOG.warnf("Couldn't parse MDC Context Data Key %s", key);
}
});
return contextMap;
}
}
Interceptors
Interceptors are a mechanism for modifying inbound and outbound SDK calls. Interceptors are commonly used to add tracing and authorization to the scheduling and execution of Workflows and Activities. You can compare these to "middleware" in other frameworks.
To enable interceptors simply produce a CDI bean implementing the WorkflowClientInterceptor
interface or WorkerInterceptor
interface. Interceptors can be ordered by declaring a @Priority
on each interceptor, so they are executed in that order.
package com.yourcompany;
import java.util.Optional;
import jakarta.annotation.Priority;
import jakarta.inject.Singleton;
import io.quarkus.arc.Unremovable;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.ActivityCompletionClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.common.interceptors.WorkflowClientInterceptor;
@Singleton
@Unremovable
@Priority(1)
public class TestWorkflowClientInterceptor implements WorkflowClientInterceptor {
@Override
public WorkflowStub newUntypedWorkflowStub(String workflowType, WorkflowOptions options, WorkflowStub next) {
return next;
}
@Override
public WorkflowStub newUntypedWorkflowStub(WorkflowExecution execution, Optional<String> workflowType, WorkflowStub next) {
return next;
}
@Override
public ActivityCompletionClient newActivityCompletionClient(ActivityCompletionClient next) {
return next;
}
@Override
public WorkflowClientCallsInterceptor workflowClientCallsInterceptor(WorkflowClientCallsInterceptor next) {
return next;
}
}
Extension Configuration Reference
Configuration property fixed at build time - All other configuration properties are overridable at runtime
Type |
Default |
|
---|---|---|
enable mock for testing Environment variable: |
boolean |
|
If Temporal registers in the health check by pinging the service. Environment variable: |
boolean |
|
Enable OpenTelemetry instrumentation, enabled by default if OpenTelemetry capability is detected. Environment variable: |
boolean |
|
enable mock for testing Environment variable: |
boolean |
|
A Namespace is a unit of isolation within the Temporal Platform. Environment variable: |
string |
|
Override human-readable identity of the worker. Identity is used to identify a worker and is recorded in the workflow history events. For example when a worker gets an activity task the correspondent ActivityTaskStarted event contains the worker identity as a field. Default is whatever (ManagementFactory. getRuntimeMXBean().getName() returns. Environment variable: |
string |
|
Enable Micrometer, enabled by default if Micrometer capability is detected. Environment variable: |
boolean |
|
The interval at which we report metrics to the metric registry. Default is 15 seconds. Environment variable: |
|
|
Set a unique identifier for this worker. The identifier should be stable with respect to the code the worker uses for workflows, activities, and interceptors. For more information see: TODO: Doc link A Build Id must be set if useBuildIdForVersioning is set true. Defaults to the latest commit id Environment variable: |
string |
|
Explicitly bind a workflow with this worker Environment variable: |
list of string |
|
Explicitly bind a workflow with this worker Environment variable: |
list of string |
|
Task queue name worker uses to poll. It uses this name for both workflow and activity task queue polls. Default is worker name Environment variable: |
string |
|
Maximum number of activities started per second by this worker. Default is 0 which means unlimited. Environment variable: |
double |
|
Maximum number of activities executed in parallel. Default is 200, which is chosen if set to zero. Environment variable: |
int |
|
Maximum number of simultaneously executed workflow tasks. Default is 200, which is chosen if set to zero. Environment variable: |
int |
|
Maximum number of local activities executed in parallel. Default is 200, which is chosen if set to zero. Environment variable: |
int |
|
Sets the rate limiting on number of activities that can be executed per second. This is managed by the server and controls activities per second for the entire task queue across all the workers. Notice that the number is represented in double, so that you can set it to less than 1 if needed. For example, set the number to 0.1 means you want your activity to be executed once every 10 seconds. This can be used to protect down stream services from flooding. The zero value of these uses the default value. Default is unlimited. Environment variable: |
double |
|
Sets the maximum number of simultaneous long poll requests to the Temporal Server to retrieve workflow tasks. Changing this value will affect the rate at which the worker is able to consume tasks from a task queue. Due to internal logic where pollers alternate between sticky and non-sticky queues, this value cannot be 1 and will be adjusted to 2 if set to that value. Default is 5, which is chosen if set to zero. Environment variable: |
int |
|
Number of simultaneous poll requests on activity task queue. Consider incrementing if the worker is not throttled due to Environment variable: |
int |
|
If set to true worker would only handle workflow tasks and local activities. Non-local activities will not be executed by this worker. Default is false. Environment variable: |
boolean |
|
Time period in ms that will be used to detect workflows deadlock. Default is 1000ms, which is chosen if set to zero. Specifies an amount of time in milliseconds that workflow tasks are allowed to execute without interruption. If workflow task runs longer than specified interval without yielding (like calling an Activity), it will fail automatically. Environment variable: |
long |
|
The maximum amount of time between sending each pending heartbeat to the server. Regardless of heartbeat timeout, no pending heartbeat will wait longer than this amount of time to send. Default is 60s, which is chosen if set to null or 0. Environment variable: |
|
|
The default amount of time between sending each pending heartbeat to the server. This is used if the ActivityOptions do not provide a HeartbeatTimeout. Otherwise, the interval becomes a value a bit smaller than the given HeartbeatTimeout. Default is 30s, which is chosen if set to null or 0. Environment variable: |
|
|
Timeout for a workflow task routed to the "sticky worker" - host that has the workflow instance cached in memory. Once it times out, then it can be picked up by any worker. Default value is 5 seconds. Environment variable: |
|
|
Disable eager activities. If set to true, eager execution will not be requested for activities requested from workflows bound to this Worker. Eager activity execution means the server returns requested eager activities directly from the workflow task back to this worker which is faster than non-eager which may be dispatched to a separate worker. Defaults to false, meaning that eager activity execution is permitted Environment variable: |
boolean |
|
Opts the worker in to the Build-ID-based versioning feature. This ensures that the worker will only receive tasks which it is compatible with. For more information see: TODO: Doc link Defaults to false Environment variable: |
boolean |
|
During graceful shutdown, as when calling WorkerFactory. shutdown(), if the workflow cache is enabled, this timeout controls how long to wait for the sticky task queue to drain before shutting down the worker. If set the worker will stop making new poll requests on the normal task queue, but will continue to poll the sticky task queue until the timeout is reached. This value should always be greater than clients rpc long poll timeout, which can be set via WorkflowServiceStubsOptions. Builder. setRpcLongPollTimeout(Duration). Default is not to wait. Environment variable: |
|
|
Override identity of the worker primary specified in a WorkflowClient options. Environment variable: |
string |
|
Specifies server behavior if a completed workflow with the same id exists. Note that under no conditions Temporal allows two workflows with the same namespace and workflow id run simultaneously. See @line setWorkflowIdConflictPolicy for handling a workflow id duplication with a Running workflow. Default value if not set: AllowDuplicate Environment variable: |
|
|
Specifies server behavior if a Running workflow with the same id exists. See setWorkflowIdReusePolicy for handling a workflow id duplication with a Closed workflow. Cannot be set when workflow-id-reuse-policy is WorkflowIdReusePolicy. Default value if not set: Fail Environment variable: |
|
|
The time after which a workflow run is automatically terminated by Temporal service with WORKFLOW_EXECUTION_TIMED_OUT status. The default is set to the same value as the Workflow Execution Timeout. Environment variable: |
||
The time after which workflow execution (which includes run retries and continue as new) is automatically terminated by Temporal service with WORKFLOW_EXECUTION_TIMED_OUT status. The default value is ∞ (infinite) - [TO DO]: check with temporal how to set this infinite value Environment variable: |
||
Maximum execution time of a single Workflow Task. In the majority of cases there is no need to change this timeout. Note that this timeout is not related to the overall Workflow duration in any way. It defines for how long the Workflow can get blocked in the case of a Workflow Worker crash. The default value is 10 seconds. Maximum value allowed by the Temporal Server is 1 minute. Environment variable: |
|
|
cron schedule Environment variable: |
string |
|
If WorkflowClient is used to create a WorkerFactory that is started has a non-paused worker on the right task queue has available workflow task executor slots and such a WorkflowClient is used to start a workflow, then the first workflow task could be dispatched on this local worker with the response to the start call if Server supports it. This option can be used to disable this mechanism. Default is true Environment variable: |
boolean |
|
Time to wait before dispatching the first workflow task. If the workflow gets a signal before the delay, a workflow task will be dispatched and the rest of the delay will be ignored. A signal from signal with start will not trigger a workflow task. Cannot be set the same time as a CronSchedule. Environment variable: |
||
Type |
Default |
|
Sets a target string, which can be either a valid Environment variable: |
string |
|
Sets option to enable SSL/ TLS/ HTTPS for gRPC. Environment variable: |
boolean |
|
Interval of the first retry, on regular failures. If coefficient is 1.0 then it is used for all retries. Defaults to 100ms. Environment variable: |
|
|
Interval of the first retry, on congestion related failures (i. e. RESOURCE_EXHAUSTED errors). If coefficient is 1.0 then it is used for all retries. Defaults to 1000ms. Environment variable: |
|
|
Maximum time to retry. When exceeded the retries stop even if maximum retries is not reached yet. Defaults to 1 minute. Environment variable: |
|
|
Coefficient used to calculate the next retry interval. The next retry interval is previous interval multiplied by this coefficient. Must be 1 or larger. Default is 1.5. Environment variable: |
double |
|
When exceeded the amount of attempts, stop. Even if expiration time is not reached. Default is unlimited which is chosen if set to 0. Environment variable: |
int |
|
Maximum interval between retries. Exponential backoff leads to interval increase. This value is the cap of the increase. Default is 50x of initial interval. Can’t be less than initial-interval Environment variable: |
||
Maximum amount of jitter to apply. 0.2 means that actual retry time can be +/- 20% of the calculated time. Set to 0 to disable jitter. Must be lower than 1. Default is 0.2. Environment variable: |
double |
|
Makes request that receives a server response with gRPC code and failure of detailsClass type non-retryable. Environment variable: |
list of Code |
|
Type |
Default |
|
List of application failures types to not retry Environment variable: |
list of string |
|
Interval of the first retry. If coefficient is 1.0 then it is used for all retries. Default is 1 second. Environment variable: |
|
|
Coefficient used to calculate the next retry interval. The next retry interval is previous interval multiplied by this coefficient. Must be 1 or larger. Default is 2.0. Environment variable: |
double |
|
When exceeded the amount of attempts, stop. Even if expiration time is not reached. Default is unlimited if set to 0. Environment variable: |
int |
|
Maximum interval between retries. Exponential backoff leads to interval increase. This value is the cap of the increase. Default is 100x of initial interval. Can’t be less than initialInterval Environment variable: |
About the Duration format
To write duration values, use the standard You can also use a simplified format, starting with a number:
In other cases, the simplified format is translated to the
|