Quarkus Zeebe
Quarkus Zeebe extension. Zeebe’s cloud-native design provides the performance, resilience, and security enterprises need to future-proof their process orchestration efforts.
Installation
If you want to use this extension, you need to add the io.quarkiverse.zeebe:quarkus-zeebe
extension first to your build file.
For instance, with Maven, add the following dependency to your POM file:
<dependency>
<groupId>io.quarkiverse.zeebe</groupId>
<artifactId>quarkus-zeebe</artifactId>
<version>1.6.0</version>
</dependency>
Upgrade
In version>=0.8.0
we replaced@ZeebeWorker
with@JobWorker
annotation.
In version>=0.7.0
we removed the hazelcast dependency and zeebe-simple-monitor for test and dev services. Now we do use zeebe-test-container with debug exporter and zeebe-dev-monitor. In test module we remove our assert API and switch to Camunda BpmnAssert from zeebe-process-test. Test api migration:io.quarkiverse.zeebe.test.BpmnAssert
→io.camunda.zeebe.process.test.assertions.BpmnAssert
Configuration
Exemplary Setup for your local development
Generally speaking there are three ways to configure your quarkus project to speak with camunda: - Local dev instance with dev services - Shared local dev instance - Direct interaction with Camunda SaaS/ on-premise
You can see some exemplary configurations for each of the setups below. Please note that these are only exemplary and can be adapted to your needs.
Local dev instance with dev services
# enable auto load bpmn resources
quarkus.zeebe.resources.enabled=true
# src/main/resources/bpmn
quarkus.zeebe.resources.location=bpmn
# Enable zeebe Dev Service:
quarkus.zeebe.devservices.enabled=true
# only start devservices, if no running docker container is found
quarkus.zeebe.devservices.shared=true
# zeebe service name
quarkus.zeebe.devservices.service-name=zeebe_broker
# enable reusable zeebe test-container (https://www.testcontainers.org/features/reuse/)
quarkus.zeebe.devservices.reuse=false
# enable hot restart for bpmn subdirectories changes
quarkus.zeebe.dev-mode.watch-bpmn-dir=true
# enable hot restart for bpmn files changes
quarkus.zeebe.dev-mode.watch-bpmn-files=true
# enable hot restart for job worker changes
quarkus.zeebe.dev-mode.watch-job-worker=true
Shared local dev instance
quarkus.zeebe.client.broker.gateway-address=localhost:26500
# If you are sure that there is already an instance running, yu can directly deactivate it
quarkus.zeebe.devservices.enabled=false
quarkus.zeebe.devservices.shared=true
quarkus.zeebe.devservices.serviceName=zeebe_broker
Direct interaction with Camunda Cloud live instance
Preferably you would be using a dev instance of Camunda and not your production process engine ;)
# Disable local dev services
quarkus.zeebe.devservices.enabled=false
# Enter your cloud credentials from the zeebe portal
quarkus.zeebe.client.broker.gateway-address=
# Specify a cluster id.
quarkus.zeebe.client.cloud.cluster-id=
# Specify a client id.
quarkus.zeebe.client.cloud.client-id=
# Specify a client secret to request an access token.
quarkus.zeebe.client.cloud.client-secret=
# Specify a cloud region.
quarkus.zeebe.client.cloud.region=
quarkus.zeebe.client.cloud.base-url=zeebe.camunda.io
quarkus.zeebe.client.cloud.auth-url=https://login.cloud.camunda.io/oauth/token
quarkus.zeebe.client.cloud.port=443
# Make sure you are disabling plaintext security, otherwise connection will fail
quarkus.zeebe.client.security.plaintext=false
Metrics
Whether zeebe
metrics is enabled or not is done by quarkus.zeebe.metrics.enabled
build time property.
The default is true
, but shown here to indicate how it can be disabled.
quarkus.zeebe.metrics.enabled=true
You can access the following metrics:
-
camunda.job.invocations
: Number of invocations of job workers (tagging the job type) -
activated
: The job was activated and started to process an item -
completed
: The processing was completed successfully -
failed
: The processing failed with some exception -
bpmn-error
: The processing completed by throwing an ZeebeBpmnError (which means there was no technical problem)
Example:
# TYPE camunda_job_invocations counter
# HELP camunda_job_invocations
camunda_job_invocations_total{action="activated",type="gateway-empty-data"} 1.0
camunda_job_invocations_total{action="completed",type="gateway-show-data"} 2.0
camunda_job_invocations_total{action="activated",type="gateway-show-data"} 2.0
camunda_job_invocations_total{action="completed",type="gateway-read-data"} 2.0
Tracing
Whether zeebe
tracing is enabled or not is done by quarkus.zeebe.tracing.enabled
build time property. The default is true
, but shown here to indicate how it can be disabled.
quarkus.zeebe.tracing.enabled=true
Dev-Services
Dev Services for Zeebe is automatically enabled unless:
* quarkus.zeebe.devservices.enabled
is set to false
* quarkus.zeebe.broker.gateway-address
is configured
Dev Service for Zeebe relies on Docker to start the broker. If your environment does not support Docker, you will need
to start the broker manually, or connect to an already running broker. You can configure the broker address using
quarkus.zeebe.broker.gateway-address
.
To activate Dev Service use this configuration:
quarkus.zeebe.devservices.enabled=true
Usage
@JobWorker
You need to configure the job type via the JobWorker
annotation:
@JobWorker(type = "my-job")
public void executeMyJob() {
// handles jobs of type 'my-job'
}
If you don’t specify the type
the method name is use as default:
@JobWorker
public void test() {
// handles jobs of type 'test'
}
Or you can set a default job type:
quarkus.zeebe.client.job.default-type=test
Variables
You can specify that you only want to fetch some variables or when executing a job, which can decrease load and improve performance:
@JobWorker(type="test", fetchVariables={"var1", "var2"})
public void test(final JobClient client, final ActivatedJob job) {
String var1 = (String) job.getVariablesAsMap().get("var1");
System.out.println(var1);
// ...
}
@Variable
By using the @Variable
annotation there is a shortcut to make variable retrieval simpler, including the type cast:
@JobWorker(type="test")
public void test(final JobClient client, final ActivatedJob job, @Variable String var1) {
System.out.println(var1);
// ...
}
With @Variable
or fetchVariables
you limit which variables are loaded from the workflow engine.
You can also override this with fetchAllVariables
and force that all variables are loaded anyway:
@JobWorker(type="test", fetchAllVariables=true)
public void test(final JobClient client, final ActivatedJob job) {
String var1 = (String) job.getVariablesAsMap().get("var1");
System.out.println(var1);
// ...
}
@VariablesAsType
You can also use your own class into which the process variables are mapped to (comparable with getVariablesAsType() in
the Java Client API). Therefore, use the @VariablesAsType
annotation. In the below example, Parameter refers to your own class:
@JobWorker(type = "job1")
public void job1(@VariablesAsType Parameter p) {
System.out.println(p.getValue());
p.setValue(1);
// ... custom code
return param;
}
Fetch variables via Job
You can access variables of a process via the ActivatedJob object, which is passed into the method if it is a parameter:
@JobWorker(type="test")
public void test(final ActivatedJob job) {
String var1 = (String) job.getVariablesAsMap().get("var1");
System.out.println(var1);
// ...
}
@CustomHeader
You can use the @CustomHeader
annotation for a parameter to retrieve custom header for a job:
@JobWorker(type = "job1")
public void job1(@VariablesAsType Parameter p, @CustomHeader String header1, @CustomHeader("custom-header") String header2) {
System.out.println(header1);
System.out.println(header2);
System.out.println(p.getValue());
p.setValue(1);
// ... custom code
return param;
}
@CustomHeaders
You can use the @CustomHeaders
annotation for a parameter to retrieve custom headers for a job:
@JobWorker(type = "job1")
public void job1(@VariablesAsType Parameter p, @CustomHeaders Map<String, String> headers) {
System.out.println(headers.get("header1"));
System.out.println(headers.get("custom-header"));
System.out.println(p.getValue());
p.setValue(1);
// ... custom code
}
Auto-completing jobs
By default, the autoComplete attribute is set to true for any job worker. In this case, the Quarkus extension will take care about job completion for you:
@JobWorker(type = "job1")
public void job1(final ActivatedJob job) {
// ... custom code ...
// no need to call client.newCompleteCommand()...
}
Note that the code within the handler method needs to be synchronously executed, as the completion will be triggered right after the method has finished.
When using autoComplete you can:
* Return a Map, String, InputStream, or Object, which then will be added to the process variables
* Throw a ZeebeBpmnError
which results in a BPMN error being sent to Zeebe
* Throw any other Exception that leads in a failure handed over to Zeebe
@JobWorker(type = "job1")
public Map<String, Object> job1(final ActivatedJob job) {
// ... custom code ...
if (ok) {
return responseMap;
} else {
throw new ZeebeBpmnError("Error code", "Error message");
}
}
Programmatically completing jobs
Your job worker code can also complete the job itself. This gives you more control about when exactly you want to complete the job (e.g. allowing the completion to be moved to reactive callbacks):
@JobWorker(type = "job1")
public void job1(final JobClient client, final ActivatedJob job) {
// ... custom code ...
client.newCompleteCommand(job.getKey()).send()
.exceptionally( throwable -> { throw new RuntimeException("Could not complete job " + job, throwable); });
}
Ideally, you don’t use blocking behavior like send().join()
, as this is blocking call to wait for the issues command
to be executed on the workflow engine. While this is very straightforward to use and produces easy-to-read code,
blocking code is limited in terms of scalability.
That’s why the worker above showed a different pattern (using exceptionally), often you might also want to use the whenComplete callback:
client.newCompleteCommand(job.getKey()).send()
.whenComplete((result, exception) -> {});
This registers a callback to be executed if the command on the workflow engine was executed or resulted in an exception. This allows for parallelism. This is discussed in more detail in this blog post about writing good workers for Camunda Cloud.
Note that when completing jobs programmatically, you must specify autoComplete = false
. Otherwise, there is a race
condition between your programmatic job completion and the Quarkus extension integration job completion, this can
lead to unpredictable results.
Throwing ZeebeBpmnError
Whenever your code hits a problem that should lead to a BPMN error being raised, you can simply throw a ZeebeBpmnError providing the error code used in BPMN:
@JobWorker(type = "job1")
public Map<String, Object> job1(final ActivatedJob job) {
// ... custom code ...
if (ok) {
return responseMap;
} else {
throw new ZeebeBpmnError("Error code", "Error message");
}
}
Non-blocking Methods
By default, a scheduled method is executed on the main executor for blocking tasks. As a result, a technology that is
designed to run on a Vert.x
event loop (such as Hibernate Reactive) cannot be used inside the method body. For this
reason, a job worker method that returns java.util.concurrent.CompletionStage<?>
or io.smallrye.mutiny.Uni<Void>
or is annotated with @io.smallrye.common.annotation.NonBlocking
is executed on the Vert.x event loop.
@JobWorker(type = "job1")
public Uni<Void> job1(final ActivatedJob job) {
// ... custom code ...
// no need to call client.newCompleteCommand()...
}
The return type Uni<Void>
instructs the job worker to execute the method on the Vert.x event loop.
Testing
To use the test extension, add this dependency to the project:
<dependency>
<groupId>io.quarkiverse.zeebe</groupId>
<artifactId>quarkus-zeebe-test</artifactId>
<version>{version}</version>
<scope>test</scope>
</dependency>
To use the ZeebeClient
and BpmnAssert
in the tests use the @QuarkusTestResource(ZeebeTestResource.class)
and enable this configuration:
quarkus.zeebe.devservices.enabled=true
Test example
import io.quarkiverse.zeebe.test.ZeebeTestResource;
import io.quarkus.test.common.QuarkusTestResource;
import io.camunda.zeebe.client.ZeebeClient;
@QuarkusTest
@QuarkusTestResource(ZeebeTestResource.class)
public class BaseTest {
@InjectZeebeClient
ZeebeClient client;
@Test
public void startProcessTest() {
ProcessInstanceEvent event = client.newCreateInstanceCommand()
.bpmnProcessId("test").latestVersion()
.variables(Map.of("k","v")).send().join();
ProcessInstanceAssert a = BpmnAssert.assertThat(event);
await().atMost(7, SECONDS).untilAsserted(a::isCompleted);
}
}
We can reuse the test for the integration test.
import io.quarkus.test.junit.QuarkusIntegrationTest;
@QuarkusIntegrationTest
public class BaseIT extends BaseTest {
}
For more information check examples in the integration-tests
directory in this repo.
Testing with embedded engine
The Zeebe process test embedded engine requires Java version >= 17
import io.quarkiverse.zeebe.test.ZeebeTestEmbeddedResource;
import io.quarkus.test.common.QuarkusTestResource;
import io.camunda.zeebe.client.ZeebeClient;
@QuarkusTest
@QuarkusTestResource(ZeebeTestEmbeddedResource.class)
public class BaseTest {
@InjectZeebeClient
ZeebeClient client;
@Test
public void startProcessTest() {
ProcessInstanceEvent event = client.newCreateInstanceCommand()
.bpmnProcessId("test").latestVersion()
.variables(Map.of("k","v")).send().join();
ProcessInstanceAssert a = BpmnAssert.assertThat(event);
await().atMost(7, SECONDS).untilAsserted(a::isCompleted);
}
}
Extension Configuration Reference
Configuration property fixed at build time - All other configuration properties are overridable at runtime
Configuration property |
Type |
Default |
---|---|---|
If DevServices has been explicitly enabled or disabled. DevServices is generally enabled by default, unless there is an existing configuration present. When DevServices is enabled Quarkus will attempt to automatically configure and start a database when running in Dev or Test mode and when Docker is running. Environment variable: |
boolean |
|
Optional fixed port the dev service will listen to. If not defined, the port will be chosen randomly. Environment variable: |
int |
|
Optional fixed port the dev service rest service will listen to. If not defined, the port will be chosen randomly. Environment variable: |
int |
|
Indicates if the Zeebe server managed by Quarkus Dev Services is shared. When shared, Quarkus looks for running containers using label-based service discovery. If a matching container is found, it is used, and so a second one is not started. Otherwise, Dev Services for Zeebe starts a new container. The discovery uses the Container sharing is only used in dev mode. Environment variable: |
boolean |
|
The value of the This property is used when you need multiple shared Zeebe servers. Environment variable: |
string |
|
The container image name to use, for container based DevServices providers. Environment variable: |
string |
|
Helper to define the stop strategy for containers created by DevServices. In particular, we don’t want to actually stop the containers when they have been flagged for reuse, and when the Test-containers configuration has been explicitly set to allow container reuse. To enable reuse, ass Environment variable: |
boolean |
|
Optional fixed debug export receiver port the dev service will listen to. If not defined, the port will be chosen randomly. Environment variable: |
int |
|
Disable or enable debug exporter for the test. Environment variable: |
boolean |
|
Enable or disable debug exporter. Environment variable: |
boolean |
|
Disable or enabled zeebe dashboard dev-ui. Environment variable: |
boolean |
|
Observe changes in the bpmn files. Environment variable: |
boolean |
|
Observe changes in the bpmn directory and subdirectories. Environment variable: |
boolean |
|
Observe changes in the job worker. Environment variable: |
boolean |
|
Whether an auto scan BPMN process folder. Default true Environment variable: |
boolean |
|
BPMN process root folder. Default bpmn Environment variable: |
string |
|
Whether a metrics is enabled in case the micrometer or micro-profile metrics extension is present. Environment variable: |
boolean |
|
Whether a health check is published in case the smallrye-health extension is present. Environment variable: |
boolean |
|
Whether an opentracing is published in case the smallrye-opentracing extension is present. Environment variable: |
boolean |
|
Zeebe gateway address. Default: localhost:26500 Environment variable: |
string |
|
Zeebe gateway rest address. Default: localhost:8080 Environment variable: |
||
Client keep alive duration Environment variable: |
|
|
Cloud cluster ID Environment variable: |
string |
|
Cloud client secret ID Environment variable: |
string |
|
Specify a client secret to request an access token. Environment variable: |
string |
|
Cloud region Environment variable: |
string |
|
Cloud base URL Environment variable: |
string |
|
Cloud authorization server URL Environment variable: |
string |
|
Cloud port Environment variable: |
int |
|
Cloud credentials cache path Environment variable: |
string |
|
OAuth client secret ID Environment variable: |
string |
|
Specify a client secret to request an access token. Environment variable: |
string |
|
Authorization server URL Environment variable: |
string |
|
Credentials cache path Environment variable: |
string |
|
OAuth connect timeout Environment variable: |
|
|
OAuth read timeout Environment variable: |
|
|
Zeebe token audience Environment variable: |
string |
|
Maximum retries for the auto-completion command. Environment variable: |
int |
|
Maximum retries for the auto-completion command. Environment variable: |
long |
|
Sets the backoff supplier. The supplier is called to determine the retry delay after each failed request; the worker then waits until the returned delay has elapsed before sending the next request. Note that this is used only for the polling mechanism - failures in the JobHandler should be handled there, and retried there if need be. Sets the backoff multiplication factor. The previous delay is multiplied by this factor. Default is 1.5. Environment variable: |
double |
|
Sets the jitter factor. The next delay is changed randomly within a range of +/- this factor. For example, if the next delay is calculated to be 1s and the jitterFactor is 0.1 then the actual next delay can be somewhere between 0.9 and 1.1s. Default is 0.2 Environment variable: |
double |
|
Sets the maximum retry delay. Note that the jitter may push the retry delay over this maximum. Default is 1000ms. Environment variable: |
long |
|
Sets the minimum retry delay. Note that the jitter may push the retry delay below this minimum. Default is 50ms. Environment variable: |
long |
|
Client message time to live duration. Environment variable: |
|
|
Client security plaintext flag. Environment variable: |
boolean |
|
Specify a path to a certificate with which to validate gateway requests. Environment variable: |
string |
|
Overrides the authority used with TLS virtual hosting. Specifically, to override hostname verification in the TLS handshake. It does not change what host is actually connected to. Environment variable: |
string |
|
Client worker maximum active jobs. Environment variable: |
int |
|
Client worker number of threads Environment variable: |
int |
|
Client worker default name Environment variable: |
string |
|
Zeebe client request timeout configuration. Environment variable: |
|
|
Client worker global type Environment variable: |
string |
|
Client job timeout Environment variable: |
|
|
Client job pool interval Environment variable: |
|
|
Sets the backoff supplier. The supplier is called to determine the retry delay after each failed request; the worker then waits until the returned delay has elapsed before sending the next request. Note that this is used only for the polling mechanism - failures in the JobHandler should be handled there, and retried there if need be. Sets the backoff multiplication factor. The previous delay is multiplied by this factor. Default is 1.6. Environment variable: |
double |
|
Sets the jitter factor. The next delay is changed randomly within a range of +/- this factor. For example, if the next delay is calculated to be 1s and the jitterFactor is 0.1 then the actual next delay can be somewhere between 0.9 and 1.1s. Default is 0.1 Environment variable: |
double |
|
Sets the maximum retry delay. Note that the jitter may push the retry delay over this maximum. Default is 5000ms. Environment variable: |
long |
|
Sets the minimum retry delay. Note that the jitter may push the retry delay below this minimum. Default is 50ms. Environment variable: |
long |
|
List of span names Environment variable: |
list of string |
|
Zeebe client tenant ID. The tenant identifier which is used for tenant-aware commands when no tenant identifier is set. Environment variable: |
string |
|
Zeebe client default job worker tenant ID’s. The tenant identifiers which are used for job-activation commands when no tenant identifiers are set. Environment variable: |
list of string |
|
Zeebe client is active Environment variable: |
boolean |
|
Zeebe worker enable or disable flag. Environment variable: |
boolean |
|
Zeebe worker handler name. Environment variable: |
string |
|
Zeebe worker timeout. Environment variable: |
long |
|
Zeebe worker maximum jobs active. Environment variable: |
int |
|
Zeebe worker request timeout. Environment variable: |
long |
|
Zeebe worker poll interval. Environment variable: |
long |
About the Duration format
To write duration values, use the standard You can also use a simplified format, starting with a number:
In other cases, the simplified format is translated to the
|