Writing a Batch Application with Quarkus
What is Batch Processing?
Batch processing is typified by bulk-oriented, non-interactive, background execution. Frequently long-running, it may be data or computationally intensive, execute sequentially or in parallel, and may be initiated through various invocation models, including ad hoc, scheduled, and on-demand.
Why Batch?
-
Use idle resources and shift processing time by scheduling Jobs to off-peak hours
-
Process high-volume datasets and manage extensive repeated work.
-
Handle complex business logic
Writing a Batch Application
Getting Started
Add the io.quarkiverse.jberet:quarkus-jberet extension first to the build file:
<dependency>
<groupId>io.quarkiverse.jberet</groupId>
<artifactId>quarkus-jberet</artifactId>
<version>2.8.0</version>
</dependency>
implementation("io.quarkiverse.jberet:quarkus-jberet:2.8.0")
Job and Steps
A Job is an entity that encapsulates an entire batch process, and it is simply a container for Steps. A Step is a
domain object that encapsulates an independent, sequential phase of a batch job. A Step can be as simple as loading
data from a file into the database or as complex as processing payments in a banking system. A batch Step is either
a Chunk or a Batchlet.
A Job can be wired together via a Job Specification Language in XML, or programmatically via
a org.jberet.job.model.JobBuilder.JobBuilder.
Running a Job
The JobOperator provides operations to start, stop, restart, and inspect jobs. The JobOperator can be
obtained programmatically or by injection:
import jakarta.batch.operations.JobOperator;
import jakarta.batch.runtime.BatchRuntime;
import java.util.Properties;
void execute() {
JobOperator jobOperator = BatchRuntime.getJobOperator();
long executionId = jobOperator.start("jobName", new Properties());
}
import jakarta.batch.operations.JobOperator;
import jakarta.batch.runtime.BatchRuntime;
import java.util.Properties;
import jakarta.inject.Inject;
@Inject
JobOperator jobOperator;
void execute() {
jobOperator.start("jobName", new Properties());
}
To run a Job, execute JobOperator.start(String, Properties), where the first argument is the Job name jobName
and the second argument are the JobParameters to the Job execution.
Batchlet
A Batchlet is the simplest batch component, representing a task-oriented step that executes a single operation from
start to finish. It’s ideal for simple, non-chunked processing tasks like:
-
File cleanup or archival operations
-
Sending notification emails
-
Database maintenance tasks
-
External API calls
-
Simple data validation
package org.acme.batch;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Stream;
import jakarta.batch.api.AbstractBatchlet;
import jakarta.batch.runtime.BatchStatus;
import jakarta.inject.Named;
import org.eclipse.microprofile.config.inject.ConfigProperty;
@Named
public class FileCleanupBatchlet extends AbstractBatchlet {
@ConfigProperty
String directory;
@ConfigProperty
int daysToKeep;
@Override
public String process() throws Exception {
Path dirPath = Paths.get(directory);
long cutoffTime = System.currentTimeMillis() - (daysToKeep * 24L * 60L * 60L * 1000L);
try (Stream<Path> files = Files.walk(dirPath)) {
for (Path path : files.filter(Files::isRegularFile).toList()) {
if (Files.getLastModifiedTime(path).toMillis() < cutoffTime) {
Files.delete(path);
}
}
}
return BatchStatus.COMPLETED.toString();
}
}
|
All Batch components are CDI Beans. |
The Batchlet must be part of a Job definition to be executed:
<?xml version="1.0" encoding="UTF-8"?>
<job id="fileCleanupJob" xmlns="https://jakarta.ee/xml/ns/jakartaee" version="2.0">
<step id="cleanupStep">
<batchlet ref="fileCleanupBatchlet"> (1)
<properties>
<property name="directory" value="/tmp/batch"/> (2)
<property name="daysToKeep" value="30"/> (3)
</properties>
</batchlet>
</step>
</job>
|
The Job XML Definition file must be placed in |
| 1 | The fileCleanupBatchlet is the reference name of the FileCleanupBatchlet. By default, a Batch component’s
default name is its Fully Qualified Name (org.acme.batch.FileCleanupBatchlet). When a Batch component is annotated
with @Named, its name is shortened to the component’s simple name with the first letter lowercase
(fileCleanupBatchlet). |
| 2 | The configuration directory to set which directory should be scanned for files |
| 3 | The configuration daysToKeep to set which files should be kept |
|
The |
Alternatively, the Job definition may also be declared programmatically:
package org.acme.batch;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jberet.job.model.Job;
import org.jberet.job.model.JobBuilder;
import org.jberet.job.model.StepBuilder;
@Singleton
public class FileCleanupJob {
@Produces
@Named
public Job fileCleanupJob() {
return new JobBuilder("fileCleanupJob")
.step(new StepBuilder("cleanupStep")
.batchlet("fileCleanupBatchlet")
.property("directory", "/tmp/batch")
.property("daysToKepp", "30")
.build())
.build();
}
}
|
A |
To execute this Job:
import jakarta.batch.operations.JobOperator;
import jakarta.batch.runtime.BatchRuntime;
import java.util.Properties;
import jakarta.inject.Inject;
@Inject
JobOperator jobOperator;
void execute() {
Properties properties = new Properties();
properties.setProperty("directory", "/tmp/batch");
properties.setProperty("daysToKeep", "30");
long executionId = jobOperator.start("fileCleanupJob", properties);
}
The JobOperator.start creates a new JobInstance and runs the first execution of that instance, which executes
asynchronously.
Job Status
A JobInstance refers to the concept of a logical job run. Let’s say that the fileCleanupJob must be run daily. Each
daily run has its own JobInstance, so it can be tracked separately.
Each JobInstance can have multiple JobExecutions. A JobExecution refers to the concept of an attempt to run
a Job. When a Job is first started with a JobInstance, it also creates its first JobExecution. The
JobExecution tracks the status of a JobInstance.
JobOperator jobOperator = BatchRuntime.getJobOperator();
long executionId = jobOperator.start("fileCleanupJob", properties);
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
The JobOperator can be queried for the JobExecution, using the execution id returned by the start method. With
the JobExecution, it is possible to retrieve:
-
The start time and end time of the batch
-
The batch status and exit status
-
The batch parameters
A JobExecution first status is STARTING, and it can transition between statuses, until the JobExecution finishes
and ends with any of the status STOPPED, FAILED, COMPLETED, or ABANDONED.
| Type | Usage |
|---|---|
STARTING |
The job has been submitted to the batch runtime |
STARTED |
The job is running |
STOPPING |
The job has been requested to stop |
STOPPED |
The job has stopped |
FAILED |
The job finished executing because of an error |
COMPLETED |
The job finished executing successfully |
ABANDONED |
The job was marked abandoned |
|
The returned |
Configuring a JobRepository
A JobRepository holds information about jobs currently running and jobs that have run in the past. The JobOperator
interface provides access to this repository. The repository contains job instances, job executions, and step
executions in a persistent way. By default, JBeret uses an in-memory JobRepository.
Using an in-memory JobRepository is far from ideal in real production environments. For that reason, it is
recommended to change the repository to a persistent store that can store the batch metadata permanently after the
batch application shuts down.
|
Please take into account that information about executed jobs can fill the whole memory, leading to |
The JDBC JobRepository
To use the JDBC JobRepository, set the configuration to quarkus.jberet.repository.type=jdbc. This requires a
connection to a valid JDBC Datasource. To create a DataSource, please follow
the Configure Data Sources in Quarkus guide. By default, it uses the default
(unnamed) datasource. Setting quarkus.jberet.repository.jdbc.datasource allows you to point to a
https://xxx[named datasource] instead:
quarkus.datasource.batch.db-kind=postgresql
quarkus.datasource.batch.username=<your username>
quarkus.datasource.batch.password=<your password>
quarkus.datasource.batch.jdbc.url=jdbc:postgresql://localhost:5432/batch
quarkus.jberet.repository.jdbc.datasource=batch
Quarkus Batch automatically creates the required schema the first time it connects to the datasource. The DDL scripts support the following databases:
-
H2
-
Oracle
-
Microsoft SQL Server
-
MySQL
-
PostgreSQL
-
DB2
-
Sybase
-
Derby
-
HyperSQL
-
Firebird
Quarkus Batch also detects the target database to execute the right DDL scripts.
In the scenario where the application uses a non-supported database engine, or a database change management schema like Flyway or Liquibase, you can find the required DDL scripts in:
The Quarkus JBeret extension offers configuration to override the DDL and SQL files, as well as customizing the table names with prefixes and/or suffixes:
Configuration property fixed at build time - All other configuration properties are overridable at runtime
Configuration property |
Type |
Default |
|---|---|---|
The repository type to store JBeret and Job data. A Environment variable: |
string |
|
The datasource name for the JBeret Repository. By default, it uses the default (unnamed) datasource. Environment variable: |
string |
|
Custom DDL file resource for JBeret tables creation; if using custom table names please also set Environment variable: |
string |
|
Custom queries to be used to query JBeret tables; this is mandatory if custom table names are used in custom DDL filename. The file must be of type Environment variable: |
string |
|
JBeret tables name prefix. Environment variable: |
string |
|
JBeret tables name suffix. Environment variable: |
string |
Schedule a Job
While a Job can be executed by calling JobOperator.start, the most common scenario is to configure each Job to
execute on a schedule:
quarkus.jberet.job."fileCleanupJob".cron=0 0 23 ? * * *
The cron expression 0 0 23 ? * * *, instructs the JobScheduler to schedule the Job to execute at
23:00:00 every day. The syntax used for Cron expressions is based on Quartz.
See Cron Trigger for
additional information.
The Batch Runtime, exposes a JobScheduler that keeps track of all scheduled jobs:
import jakarta.inject.Inject;
import org.jberet.schedule.JobScheduler;
@Inject
JobScheduler jobScheduler;
With the JobScheduler, it is possible to list, add or cancel scheduled jobs.
Testing
Since Batch Applications execute jobs asynchronously, the testing code must explicitly wait for the job to complete or
the runtime will just quit immediately after being launched without finishing executing the job to test. You can use
Awaitility to check for the Job status and keep the process running until
the job finishes:
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
testImplementation("org.awaitility:awaitility")
And the test:
package org.acme.batch;
import static org.awaitility.Awaitility.await;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import jakarta.batch.operations.JobOperator;
import jakarta.batch.runtime.BatchStatus;
import jakarta.batch.runtime.JobExecution;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import io.quarkus.test.junit.QuarkusTest;
@QuarkusTest
class FileCleanupJobTest {
@Inject
JobOperator jobOperator;
@Test
void fileCleanup() {
Properties properties = new Properties();
properties.setProperty("directory", "/tmp/batch");
properties.setProperty("daysToKeep", "30");
long executionId = jobOperator.start("fileCleanupJob", properties); (1)
await().atMost(5, TimeUnit.SECONDS) (2)
.until(() -> {
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
return BatchStatus.COMPLETED.equals(jobExecution.getBatchStatus()); (3)
});
}
}
| 1 | Execute the job fileCleanupJob and keep a reference to the executionId |
| 2 | Wait at most for 5 seconds, until the test condition is true, whichever happens first; at the end of the 5
seconds if the condition is false, the wait is interrupted and the test fails. |
| 3 | Query JobOperator for the status of the Job and check if the Job reached a status of BatchStatus.COMPLETED |