Quarkus Zeebe
Quarkus Zeebe extension. Zeebe’s cloud-native design provides the performance, resilience, and security enterprises need to future-proof their process orchestration efforts.
Installation
If you want to use this extension, you need to add the io.quarkiverse.zeebe:quarkus-zeebe
extension first to your build file.
For instance, with Maven, add the following dependency to your POM file:
<dependency>
<groupId>io.quarkiverse.zeebe</groupId>
<artifactId>quarkus-zeebe</artifactId>
<version>0.8.0</version>
</dependency>
Upgrade
In version>=0.8.0
we replaced@ZeebeWorker
with@JobWorker
annotation.
In version>=0.7.0
we removed the hazelcast dependency and zeebe-simple-monitor for test and dev services. Now we do use zeebe-test-container with debug exporter and zeebe-dev-monitor. In test module we remove our assert API and switch to Camunda BpmnAssert from zeebe-process-test. Test api migration:io.quarkiverse.zeebe.test.BpmnAssert
→io.camunda.zeebe.process.test.assertions.BpmnAssert
Configuration
Exemplary Setup for your local development
Generally speaking there are three ways to configure your quarkus project to speak with camunda: - Local dev instance with dev services - Shared local dev instance - Direct interaction with Camunda SaaS/ on-premise
You can see some exemplary configurations for each of the setups below. Please note that these are only exemplary and can be adapted to your needs.
Local dev instance with dev services
# enable auto load bpmn resources
quarkus.zeebe.resources.enabled=true
# src/main/resources/bpmn
quarkus.zeebe.resources.location=bpmn
# Enable zeebe Dev Service:
quarkus.zeebe.devservices.enabled=true
# only start devservices, if no running docker container is found
quarkus.zeebe.devservices.shared=true
# zeebe service name
quarkus.zeebe.devservices.service-name=zeebe_broker
# enable reusable zeebe test-container (https://www.testcontainers.org/features/reuse/)
quarkus.zeebe.devservices.reuse=false
# enable zeebe monitor Dev Service:
quarkus.zeebe.devservices.monitor.enabled=true
# zeebe monitor service name
quarkus.zeebe.devservices.monitor.service-name=zeebe-dev-monitor
# enable reusable zeebe test-container (https://www.testcontainers.org/features/reuse/)
quarkus.zeebe.devservices.monitor.reuse=false
# enable hot restart for bpmn subdirectories changes
quarkus.zeebe.dev-mode.watch-bpmn-dir=true
# enable hot restart for bpmn files changes
quarkus.zeebe.dev-mode.watch-bpmn-files=true
# enable hot restart for job worker changes
quarkus.zeebe.dev-mode.watch-job-worker=true
Shared local dev instance
quarkus.zeebe.client.broker.gateway-address=localhost:26500
# If you are sure that there is already an instance running, yu can directly deactivate it
quarkus.zeebe.devservices.enabled=false
quarkus.zeebe.devservices.shared=true
quarkus.zeebe.devservices.monitor.serviceName=zeebe-dev-monitor
quarkus.zeebe.devservices.serviceName=zeebe_broker
Direct interaction with Camunda Cloud live instance
Preferably you would be using a dev instance of Camunda and not your production process engine ;)
# Disable local dev services
quarkus.zeebe.devservices.enabled=false
# Enter your cloud credentials from the zeebe portal
quarkus.zeebe.client.broker.gateway-address=
# Specify a cluster id.
quarkus.zeebe.client.cloud.cluster-id=
# Specify a client id.
quarkus.zeebe.client.cloud.client-id=
# Specify a client secret to request an access token.
quarkus.zeebe.client.cloud.client-secret=
# Specify a cloud region.
quarkus.zeebe.client.cloud.region=
quarkus.zeebe.client.cloud.base-url=zeebe.camunda.io
quarkus.zeebe.client.cloud.auth-url=https://login.cloud.camunda.io/oauth/token
quarkus.zeebe.client.cloud.port=443
# Make sure you are disabling plaintext security, otherwise connection will fail
quarkus.zeebe.client.security.plaintext=false
Metrics
Whether zeebe
metrics is enabled or not is done by quarkus.zeebe.metrics.enabled
build time property.
The default is true
, but shown here to indicate how it can be disabled.
quarkus.zeebe.metrics.enabled=true
You can access the following metrics:
-
camunda.job.invocations
: Number of invocations of job workers (tagging the job type) -
activated
: The job was activated and started to process an item -
completed
: The processing was completed successfully -
failed
: The processing failed with some exception -
bpmn-error
: The processing completed by throwing an ZeebeBpmnError (which means there was no technical problem)
Example:
# TYPE camunda_job_invocations counter
# HELP camunda_job_invocations
camunda_job_invocations_total{action="activated",type="gateway-empty-data"} 1.0
camunda_job_invocations_total{action="completed",type="gateway-show-data"} 2.0
camunda_job_invocations_total{action="activated",type="gateway-show-data"} 2.0
camunda_job_invocations_total{action="completed",type="gateway-read-data"} 2.0
Micrometer
If you already have your Quarkus project configured, you can add the quarkus-micrometer-registry-prometheus
extension to your project.
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-micrometer-registry-prometheus</artifactId>
</dependency>
Tracing
Whether zeebe
tracing is enabled or not is done by quarkus.zeebe.tracing.enabled
build time property. The default is true
, but shown here to indicate how it can be disabled.
quarkus.zeebe.tracing.enabled=true
OpenTelemetry
If you already have your Quarkus project configured, you can add the quarkus-opentelemetry-exporter-otlp
extension to your project.
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-opentelemetry-exporter-otlp</artifactId>
</dependency>
Dev-Services
Dev Services for Zeebe is automatically enabled unless:
* quarkus.zeebe.devservices.enabled
is set to false
* quarkus.zeebe.broker.gateway-address
is configured
Dev Service for Zeebe relies on Docker to start the broker. If your environment does not support Docker, you will need
to start the broker manually, or connect to an already running broker. You can configure the broker address using
quarkus.zeebe.broker.gateway-address
.
To activate Zeebe-Dev-Monitor Dev Service use this configuration:
quarkus.zeebe.devservices.enabled=true
quarkus.zeebe.devservices.monitor.enabled=true
Property qquarkus.zeebe.devservices.monitor.enabled=true
will activate the debug exporter.
Usage
@JobWorker
You need to configure the job type via the JobWorker
annotation:
@JobWorker(type = "my-job")
public void executeMyJob() {
// handles jobs of type 'my-job'
}
If you don’t specify the type
the method name is use as default:
@JobWorker
public void test() {
// handles jobs of type 'test'
}
Or you can set a default job type:
quarkus.zeebe.client.job.default-type=test
Variables
You can specify that you only want to fetch some variables or when executing a job, which can decrease load and improve performance:
@JobWorker(type="test", fetchVariables={"var1", "var2"})
public void test(final JobClient client, final ActivatedJob job) {
String var1 = (String) job.getVariablesAsMap().get("var1");
System.out.println(var1);
// ...
}
@Variable
By using the @Variable
annotation there is a shortcut to make variable retrieval simpler, including the type cast:
@JobWorker(type="test")
public void test(final JobClient client, final ActivatedJob job, @Variable String var1) {
System.out.println(var1);
// ...
}
With @Variable
or fetchVariables
you limit which variables are loaded from the workflow engine.
You can also override this with fetchAllVariables
and force that all variables are loaded anyway:
@JobWorker(type="test", fetchAllVariables=true)
public void test(final JobClient client, final ActivatedJob job) {
String var1 = (String) job.getVariablesAsMap().get("var1");
System.out.println(var1);
// ...
}
@VariablesAsType
You can also use your own class into which the process variables are mapped to (comparable with getVariablesAsType() in
the Java Client API). Therefore, use the @VariablesAsType
annotation. In the below example, Parameter refers to your own class:
@JobWorker(type = "job1")
public void job1(@VariablesAsType Parameter p) {
System.out.println(p.getValue());
p.setValue(1);
// ... custom code
return param;
}
Fetch variables via Job
You can access variables of a process via the ActivatedJob object, which is passed into the method if it is a parameter:
@JobWorker(type="test")
public void test(final ActivatedJob job) {
String var1 = (String) job.getVariablesAsMap().get("var1");
System.out.println(var1);
// ...
}
@CustomHeader
You can use the @CustomHeader
annotation for a parameter to retrieve custom header for a job:
@JobWorker(type = "job1")
public void job1(@VariablesAsType Parameter p, @CustomHeader String header1, @CustomHeader("custom-header") String header2) {
System.out.println(header1);
System.out.println(header2);
System.out.println(p.getValue());
p.setValue(1);
// ... custom code
return param;
}
@CustomHeaders
You can use the @CustomHeaders
annotation for a parameter to retrieve custom headers for a job:
@JobWorker(type = "job1")
public void job1(@VariablesAsType Parameter p, @CustomHeaders Map<String, String> headers) {
System.out.println(headers.get("header1"));
System.out.println(headers.get("custom-header"));
System.out.println(p.getValue());
p.setValue(1);
// ... custom code
}
Auto-completing jobs
By default, the autoComplete attribute is set to true for any job worker. In this case, the Quarkus extension will take care about job completion for you:
@JobWorker(type = "job1")
public void job1(final ActivatedJob job) {
// ... custom code ...
// no need to call client.newCompleteCommand()...
}
Note that the code within the handler method needs to be synchronously executed, as the completion will be triggered right after the method has finished.
When using autoComplete you can:
* Return a Map, String, InputStream, or Object, which then will be added to the process variables
* Throw a ZeebeBpmnError
which results in a BPMN error being sent to Zeebe
* Throw any other Exception that leads in a failure handed over to Zeebe
@JobWorker(type = "job1")
public Map<String, Object> job1(final ActivatedJob job) {
// ... custom code ...
if (ok) {
return responseMap;
} else {
throw new ZeebeBpmnError("Error code", "Error message");
}
}
Programmatically completing jobs
Your job worker code can also complete the job itself. This gives you more control about when exactly you want to complete the job (e.g. allowing the completion to be moved to reactive callbacks):
@JobWorker(type = "job1")
public void job1(final JobClient client, final ActivatedJob job) {
// ... custom code ...
client.newCompleteCommand(job.getKey()).send()
.exceptionally( throwable -> { throw new RuntimeException("Could not complete job " + job, throwable); });
}
Ideally, you don’t use blocking behavior like send().join()
, as this is blocking call to wait for the issues command
to be executed on the workflow engine. While this is very straightforward to use and produces easy-to-read code,
blocking code is limited in terms of scalability.
That’s why the worker above showed a different pattern (using exceptionally), often you might also want to use the whenComplete callback:
client.newCompleteCommand(job.getKey()).send()
.whenComplete((result, exception) -> {});
This registers a callback to be executed if the command on the workflow engine was executed or resulted in an exception. This allows for parallelism. This is discussed in more detail in this blog post about writing good workers for Camunda Cloud.
Note that when completing jobs programmatically, you must specify autoComplete = false
. Otherwise, there is a race
condition between your programmatic job completion and the Quarkus extension integration job completion, this can
lead to unpredictable results.
Throwing ZeebeBpmnError
Whenever your code hits a problem that should lead to a BPMN error being raised, you can simply throw a ZeebeBpmnError providing the error code used in BPMN:
@JobWorker(type = "job1")
public Map<String, Object> job1(final ActivatedJob job) {
// ... custom code ...
if (ok) {
return responseMap;
} else {
throw new ZeebeBpmnError("Error code", "Error message");
}
}
Non-blocking Methods
By default, a scheduled method is executed on the main executor for blocking tasks. As a result, a technology that is
designed to run on a Vert.x
event loop (such as Hibernate Reactive) cannot be used inside the method body. For this
reason, a job worker method that returns java.util.concurrent.CompletionStage<?>
or io.smallrye.mutiny.Uni<Void>
or is annotated with @io.smallrye.common.annotation.NonBlocking
is executed on the Vert.x event loop.
@JobWorker(type = "job1")
public Uni<Void> job1(final ActivatedJob job) {
// ... custom code ...
// no need to call client.newCompleteCommand()...
}
The return type Uni<Void>
instructs the job worker to execute the method on the Vert.x event loop.
Testing
To use the test extension, add this dependency to the project:
<dependency>
<groupId>io.quarkiverse.zeebe</groupId>
<artifactId>quarkus-zeebe-test</artifactId>
<version>{version}</version>
<scope>test</scope>
</dependency>
To use the ZeebeClient
and BpmnAssert
in the tests use the @QuarkusTestResource(ZeebeTestResource.class)
and enable this configuration:
quarkus.zeebe.devservices.enabled=true
Test example
import io.quarkiverse.zeebe.test.ZeebeTestResource;
import io.quarkus.test.common.QuarkusTestResource;
import io.camunda.zeebe.client.ZeebeClient;
@QuarkusTest
@QuarkusTestResource(ZeebeTestResource.class)
public class BaseTest {
@InjectZeebeClient
ZeebeClient client;
@Test
public void startProcessTest() {
ProcessInstanceEvent event = client.newCreateInstanceCommand()
.bpmnProcessId("test").latestVersion()
.variables(Map.of("k","v")).send().join();
ProcessInstanceAssert a = BpmnAssert.assertThat(event);
await().atMost(7, SECONDS).untilAsserted(a::isCompleted);
}
}
We can reuse the test for the integration test.
import io.quarkus.test.junit.QuarkusIntegrationTest;
@QuarkusIntegrationTest
public class BaseIT extends BaseTest {
}
For more information check examples in the integration-tests
directory in this repo.
Testing with embedded engine
The Zeebe process test embedded engine requires Java version >= 17
import io.quarkiverse.zeebe.test.ZeebeTestEmbeddedResource;
import io.quarkus.test.common.QuarkusTestResource;
import io.camunda.zeebe.client.ZeebeClient;
@QuarkusTest
@QuarkusTestResource(ZeebeTestEmbeddedResource.class)
public class BaseTest {
@InjectZeebeClient
ZeebeClient client;
@Test
public void startProcessTest() {
ProcessInstanceEvent event = client.newCreateInstanceCommand()
.bpmnProcessId("test").latestVersion()
.variables(Map.of("k","v")).send().join();
ProcessInstanceAssert a = BpmnAssert.assertThat(event);
await().atMost(7, SECONDS).untilAsserted(a::isCompleted);
}
}
Extension Configuration Reference
Configuration property fixed at build time - All other configuration properties are overridable at runtime
Type |
Default |
|
---|---|---|
If DevServices has been explicitly enabled or disabled. DevServices is generally enabled by default, unless there is an existing configuration present. When DevServices is enabled Quarkus will attempt to automatically configure and start a database when running in Dev or Test mode and when Docker is running. Environment variable: |
boolean |
|
Optional fixed port the dev service will listen to. If not defined, the port will be chosen randomly. Environment variable: |
int |
|
Indicates if the Zeebe server managed by Quarkus Dev Services is shared. When shared, Quarkus looks for running containers using label-based service discovery. If a matching container is found, it is used, and so a second one is not started. Otherwise, Dev Services for Zeebe starts a new container.
The discovery uses the Environment variable: |
boolean |
|
The value of the Environment variable: |
string |
|
The container image name to use, for container based DevServices providers. Environment variable: |
string |
|
Helper to define the stop strategy for containers created by DevServices. In particular, we don’t want to actually stop the containers when they have been flagged for reuse, and when the Testcontainers configuration has been explicitly set to allow container reuse. To enable reuse, ass Environment variable: |
boolean |
|
Enable or disable dev monitor for dev-services. Environment variable: |
boolean |
|
Optional fixed port the dev monitor will listen to. If not defined, the port will be chosen randomly. Environment variable: |
int |
|
The container image name to use, for container based zeebe simple monitor. Environment variable: |
string |
|
The value of the Environment variable: |
string |
|
Helper to define the stop strategy for containers created by DevServices. In particular, we don’t want to actually stop the containers when they have been flagged for reuse, and when the Testcontainers configuration has been explicitly set to allow container reuse. To enable reuse, ass Environment variable: |
boolean |
|
Optional fixed debug export receiver port the dev service will listen to. If not defined, the port will be chosen randomly. Environment variable: |
int |
|
Disable or enable debug exporter for the test. Environment variable: |
boolean |
|
Enable or disable debug exporter. Environment variable: |
boolean |
|
Fixed debug export receiver port the localhost service will listen to. Environment variable: |
int |
|
Observe changes in the bpmn files. Environment variable: |
boolean |
|
Observe changes in the bpmn directory and subdirectories. Environment variable: |
boolean |
|
Observe changes in the job worker. Environment variable: |
boolean |
|
Whether an auto scan BPMN process folder. Default true Environment variable: |
boolean |
|
BPMN process root folder. Default bpmn Environment variable: |
string |
|
Whether a metrics is enabled in case the micrometer or micro-profile metrics extension is present. Environment variable: |
boolean |
|
Whether a health check is published in case the smallrye-health extension is present. Environment variable: |
boolean |
|
Whether an opentracing is published in case the smallrye-opentracing extension is present. Environment variable: |
boolean |
|