Writing a Batch Application with Quarkus

What is Batch Processing?

Batch processing is typified by bulk-oriented, non-interactive, background execution. Frequently long-running, it may be data or computationally intensive, execute sequentially or in parallel, and may be initiated through various invocation models, including ad hoc, scheduled, and on-demand.

Why Batch?

  • Use idle resources and shift processing time by scheduling Jobs to off-peak hours

  • Process high-volume datasets and manage extensive repeated work.

  • Handle complex business logic

Writing a Batch Application

Getting Started

Add the io.quarkiverse.jberet:quarkus-jberet extension first to the build file:

pom.xml
<dependency>
    <groupId>io.quarkiverse.jberet</groupId>
    <artifactId>quarkus-jberet</artifactId>
    <version>2.8.0</version>
</dependency>
build.gradle
implementation("io.quarkiverse.jberet:quarkus-jberet:2.8.0")

Job and Steps

A Job is an entity that encapsulates an entire batch process, and it is simply a container for Steps. A Step is a domain object that encapsulates an independent, sequential phase of a batch job. A Step can be as simple as loading data from a file into the database or as complex as processing payments in a banking system. A batch Step is either a Chunk or a Batchlet.

A Job can be wired together via a Job Specification Language in XML, or programmatically via a org.jberet.job.model.JobBuilder.JobBuilder.

Running a Job

The JobOperator provides operations to start, stop, restart, and inspect jobs. The JobOperator can be obtained programmatically or by injection:

Programmatically
import jakarta.batch.operations.JobOperator;
import jakarta.batch.runtime.BatchRuntime;
import java.util.Properties;

void execute() {
    JobOperator jobOperator = BatchRuntime.getJobOperator();
    long executionId = jobOperator.start("jobName", new Properties());
}
Injection
import jakarta.batch.operations.JobOperator;
import jakarta.batch.runtime.BatchRuntime;
import java.util.Properties;
import jakarta.inject.Inject;

@Inject
JobOperator jobOperator;

void execute() {
    jobOperator.start("jobName", new Properties());
}

To run a Job, execute JobOperator.start(String, Properties), where the first argument is the Job name jobName and the second argument are the JobParameters to the Job execution.

Batchlet

A Batchlet is the simplest batch component, representing a task-oriented step that executes a single operation from start to finish. It’s ideal for simple, non-chunked processing tasks like:

  • File cleanup or archival operations

  • Sending notification emails

  • Database maintenance tasks

  • External API calls

  • Simple data validation

Batchlet
package org.acme.batch;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Stream;

import jakarta.batch.api.AbstractBatchlet;
import jakarta.batch.runtime.BatchStatus;
import jakarta.inject.Named;

import org.eclipse.microprofile.config.inject.ConfigProperty;

@Named
public class FileCleanupBatchlet extends AbstractBatchlet {
    @ConfigProperty
    String directory;
    @ConfigProperty
    int daysToKeep;

    @Override
    public String process() throws Exception {
        Path dirPath = Paths.get(directory);
        long cutoffTime = System.currentTimeMillis() - (daysToKeep * 24L * 60L * 60L * 1000L);

        try (Stream<Path> files = Files.walk(dirPath)) {
            for (Path path : files.filter(Files::isRegularFile).toList()) {
                if (Files.getLastModifiedTime(path).toMillis() < cutoffTime) {
                    Files.delete(path);
                }
            }
        }

        return BatchStatus.COMPLETED.toString();
    }
}

All Batch components are CDI Beans.

The Batchlet must be part of a Job definition to be executed:

fileCleanupJob.xml
<?xml version="1.0" encoding="UTF-8"?>
<job id="fileCleanupJob" xmlns="https://jakarta.ee/xml/ns/jakartaee" version="2.0">
  <step id="cleanupStep">
    <batchlet ref="fileCleanupBatchlet">                  (1)
      <properties>
        <property name="directory" value="/tmp/batch"/>   (2)
        <property name="daysToKeep" value="30"/>          (3)
      </properties>
    </batchlet>
  </step>
</job>

The Job XML Definition file must be placed in src/main/resources/META-INF/batch-jobs so Quarkus can discover the Job. The name of the Job is the file name without the extension: fileCleanupJob.

1 The fileCleanupBatchlet is the reference name of the FileCleanupBatchlet. By default, a Batch component’s default name is its Fully Qualified Name (org.acme.batch.FileCleanupBatchlet). When a Batch component is annotated with @Named, its name is shortened to the component’s simple name with the first letter lowercase (fileCleanupBatchlet).
2 The configuration directory to set which directory should be scanned for files
3 The configuration daysToKeep to set which files should be kept

The directory and daysToKeep configuration can also be set in application.properties or any other available configuration source.

Alternatively, the Job definition may also be declared programmatically:

CDI Producer
package org.acme.batch;

import jakarta.enterprise.inject.Produces;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

import org.jberet.job.model.Job;
import org.jberet.job.model.JobBuilder;
import org.jberet.job.model.StepBuilder;

@Singleton
public class FileCleanupJob {
    @Produces
    @Named
    public Job fileCleanupJob() {
        return new JobBuilder("fileCleanupJob")
                .step(new StepBuilder("cleanupStep")
                        .batchlet("fileCleanupBatchlet")
                        .property("directory", "/tmp/batch")
                        .property("daysToKepp", "30")
                        .build())
                .build();
    }
}

A Job declared and produced by a CDI bean is automatically discovered and exposed in the JobOperator.

To execute this Job:

import jakarta.batch.operations.JobOperator;
import jakarta.batch.runtime.BatchRuntime;
import java.util.Properties;
import jakarta.inject.Inject;

@Inject
JobOperator jobOperator;

void execute() {
    Properties properties = new Properties();
    properties.setProperty("directory", "/tmp/batch");
    properties.setProperty("daysToKeep", "30");
    long executionId = jobOperator.start("fileCleanupJob", properties);
}

The JobOperator.start creates a new JobInstance and runs the first execution of that instance, which executes asynchronously.

Job Status

A JobInstance refers to the concept of a logical job run. Let’s say that the fileCleanupJob must be run daily. Each daily run has its own JobInstance, so it can be tracked separately.

Each JobInstance can have multiple JobExecutions. A JobExecution refers to the concept of an attempt to run a Job. When a Job is first started with a JobInstance, it also creates its first JobExecution. The JobExecution tracks the status of a JobInstance.

JobExecution
JobOperator jobOperator = BatchRuntime.getJobOperator();
long executionId = jobOperator.start("fileCleanupJob", properties);
JobExecution jobExecution = jobOperator.getJobExecution(executionId);

The JobOperator can be queried for the JobExecution, using the execution id returned by the start method. With the JobExecution, it is possible to retrieve:

  • The start time and end time of the batch

  • The batch status and exit status

  • The batch parameters

A JobExecution first status is STARTING, and it can transition between statuses, until the JobExecution finishes and ends with any of the status STOPPED, FAILED, COMPLETED, or ABANDONED.

Table 1. BatchStatus values
Type Usage

STARTING

The job has been submitted to the batch runtime

STARTED

The job is running

STOPPING

The job has been requested to stop

STOPPED

The job has stopped

FAILED

The job finished executing because of an error

COMPLETED

The job finished executing successfully

ABANDONED

The job was marked abandoned

The returned JobExecution from JobOperator.getJobExecution does not automatically update with the execution of the Job. Invoke JobOperator.getJobExecution as many times as required to get updated information.

Configuring a JobRepository

A JobRepository holds information about jobs currently running and jobs that have run in the past. The JobOperator interface provides access to this repository. The repository contains job instances, job executions, and step executions in a persistent way. By default, JBeret uses an in-memory JobRepository.

Using an in-memory JobRepository is far from ideal in real production environments. For that reason, it is recommended to change the repository to a persistent store that can store the batch metadata permanently after the batch application shuts down.

Please take into account that information about executed jobs can fill the whole memory, leading to OutOfMemoryError errors, mainly in long-running applications with lots of job executions.

The JDBC JobRepository

To use the JDBC JobRepository, set the configuration to quarkus.jberet.repository.type=jdbc. This requires a connection to a valid JDBC Datasource. To create a DataSource, please follow the Configure Data Sources in Quarkus guide. By default, it uses the default (unnamed) datasource. Setting quarkus.jberet.repository.jdbc.datasource allows you to point to a https://xxx[named datasource] instead:

quarkus.datasource.batch.db-kind=postgresql
quarkus.datasource.batch.username=<your username>
quarkus.datasource.batch.password=<your password>
quarkus.datasource.batch.jdbc.url=jdbc:postgresql://localhost:5432/batch

quarkus.jberet.repository.jdbc.datasource=batch

Quarkus Batch automatically creates the required schema the first time it connects to the datasource. The DDL scripts support the following databases:

  • H2

  • Oracle

  • Microsoft SQL Server

  • MySQL

  • PostgreSQL

  • DB2

  • Sybase

  • Derby

  • HyperSQL

  • Firebird

Quarkus Batch also detects the target database to execute the right DDL scripts.

In the scenario where the application uses a non-supported database engine, or a database change management schema like Flyway or Liquibase, you can find the required DDL scripts in:

The Quarkus JBeret extension offers configuration to override the DDL and SQL files, as well as customizing the table names with prefixes and/or suffixes:

Configuration property fixed at build time - All other configuration properties are overridable at runtime

Configuration property

Type

Default

The repository type to store JBeret and Job data. A jdbc jdbc type requires a JDBC datasource.

Environment variable: QUARKUS_JBERET_REPOSITORY_TYPE

string

in-memory

The datasource name for the JBeret Repository. By default, it uses the default (unnamed) datasource.

Environment variable: QUARKUS_JBERET_REPOSITORY_JDBC_DATASOURCE

string

Custom DDL file resource for JBeret tables creation; if using custom table names please also set sql-filename property to propagate table names.

Environment variable: QUARKUS_JBERET_REPOSITORY_JDBC_DDL_FILE

string

Custom queries to be used to query JBeret tables; this is mandatory if custom table names are used in custom DDL filename.

The file must be of type properties, and must follow the exact template as defined in jberet.properties

Environment variable: QUARKUS_JBERET_REPOSITORY_JDBC_SQL_FILE

string

JBeret tables name prefix.

Environment variable: QUARKUS_JBERET_REPOSITORY_JDBC_DB_TABLE_PREFIX

string

JBeret tables name suffix.

Environment variable: QUARKUS_JBERET_REPOSITORY_JDBC_DB_TABLE_SUFFIX

string

Schedule a Job

While a Job can be executed by calling JobOperator.start, the most common scenario is to configure each Job to execute on a schedule:

quarkus.jberet.job."fileCleanupJob".cron=0 0 23 ? * * *

The cron expression 0 0 23 ? * * *, instructs the JobScheduler to schedule the Job to execute at 23:00:00 every day. The syntax used for Cron expressions is based on Quartz. See Cron Trigger for additional information.

The Batch Runtime, exposes a JobScheduler that keeps track of all scheduled jobs:

import jakarta.inject.Inject;
import org.jberet.schedule.JobScheduler;

@Inject
JobScheduler jobScheduler;

With the JobScheduler, it is possible to list, add or cancel scheduled jobs.

Testing

Since Batch Applications execute jobs asynchronously, the testing code must explicitly wait for the job to complete or the runtime will just quit immediately after being launched without finishing executing the job to test. You can use Awaitility to check for the Job status and keep the process running until the job finishes:

pom.xml
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>
build.gradle
testImplementation("org.awaitility:awaitility")

And the test:

FileCleanupJobTest
package org.acme.batch;

import static org.awaitility.Awaitility.await;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

import jakarta.batch.operations.JobOperator;
import jakarta.batch.runtime.BatchStatus;
import jakarta.batch.runtime.JobExecution;
import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;

import io.quarkus.test.junit.QuarkusTest;

@QuarkusTest
class FileCleanupJobTest {
    @Inject
    JobOperator jobOperator;

    @Test
    void fileCleanup() {
        Properties properties = new Properties();
        properties.setProperty("directory", "/tmp/batch");
        properties.setProperty("daysToKeep", "30");
        long executionId = jobOperator.start("fileCleanupJob", properties); (1)

        await().atMost(5, TimeUnit.SECONDS) (2)
                .until(() -> {
                    JobExecution jobExecution = jobOperator.getJobExecution(executionId);
                    return BatchStatus.COMPLETED.equals(jobExecution.getBatchStatus()); (3)
                });
    }
}
1 Execute the job fileCleanupJob and keep a reference to the executionId
2 Wait at most for 5 seconds, until the test condition is true, whichever happens first; at the end of the 5 seconds if the condition is false, the wait is interrupted and the test fails.
3 Query JobOperator for the status of the Job and check if the Job reached a status of BatchStatus.COMPLETED