Quarkus JBeret Components
The Quarkus JBeret Components provides reusable, batch components for common data processing tasks.
Installation
To use the JBeret Components module, add the io.quarkiverse.jberet:quarkus-jberet-components extension to your
build file:
<dependency>
<groupId>io.quarkiverse.jberet</groupId>
<artifactId>quarkus-jberet-components</artifactId>
<version>2.9.1</version>
</dependency>
implementation("io.quarkiverse.jberet:quarkus-jberet-components:2.9.1")
Read and Write Data
JDBC Database
The JDBC components provide efficient reading and writing of database records using JDBC Cursors and Batch processing. These components are ideal for:
-
Processing large database tables without loading all data into memory
-
Migrating data between databases
-
Generating aggregated statistics from database records
-
Bulk insert/update operations with optimal performance
JdbcCursorItemReader
The JdbcCursorItemReader reads data from a database using a JDBC cursor, meaning that it will read every resulting
row from the supplied sql statement one row at a time without loading the entire result set into memory.
package org.acme.batch.components.jdbc;
import javax.sql.DataSource;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import io.quarkiverse.jberet.components.runtime.item.jdbc.JdbcCursorItemReader;
@Singleton
public class AuctionJdbcCursorItemReaderProducer {
@Inject
DataSource dataSource;
@Inject
AuctionStatisticsRowMapper rowMapper;
@Produces
@Dependent
@Named("auctionsItemReader")
public JdbcCursorItemReader<AuctionStatistics> auctionsItemReader() {
String sql = """
SELECT
itemId,
sum(quantity) as totalQuantity,
sum(bid) as totalBid,
sum(buyout) as totalBuyout,
min(bid / quantity) as minBid,
min(buyout / quantity) as minBuyout,
max(bid / quantity) as maxBid,
max(buyout / quantity) as maxBuyout
FROM Auctions
GROUP BY itemId
ORDER BY itemId
""";
return new JdbcCursorItemReader<>(dataSource, sql, rowMapper);
}
}
The JdbcCursorItemReader requires:
-
A
DataSourceto read the data -
A SQL query to execute to retrieve the data
-
A
RowMapperto convert eachResultSetrow into a custom POJO
RowMapper
The RowMapper is a functional interface that maps a JDBC ResultSet row to a POJO:
package org.acme.batch.components.jdbc;
import java.sql.ResultSet;
import java.sql.SQLException;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import io.quarkiverse.jberet.components.runtime.item.jdbc.RowMapper;
@Singleton
@Named
public class AuctionStatisticsRowMapper implements RowMapper<AuctionStatistics> {
@Override
public AuctionStatistics mapRow(ResultSet resultSet) throws SQLException {
int itemId = resultSet.getInt(1);
long quantity = resultSet.getLong(2);
long bid = resultSet.getLong(3);
long buyout = resultSet.getLong(4);
long minBid = resultSet.getLong(5);
long minBuyout = resultSet.getLong(6);
long maxBid = resultSet.getLong(7);
long maxBuyout = resultSet.getLong(8);
Double avgBid = (double) (bid / quantity);
Double avgBuyout = (double) (buyout / quantity);
return new AuctionStatistics(itemId, quantity, bid, minBid, maxBid, buyout, minBuyout, maxBuyout, avgBid, avgBuyout);
}
}
package org.acme.batch.components.jdbc;
public record AuctionStatistics(
Integer itemId,
Long quantity,
Long bid,
Long minBid,
Long maxBid,
Long buyout,
Long minBuyout,
Long maxBuyout,
Double avgBid,
Double avgBuyout) {
}
The RowMapper retrieves values from the ResultSet by column index and constructs the AuctionStatistics object.
JdbcBatchItemWriter
The JdbcBatchItemWriter writes data to a database using JDBC batch processing. Instead of executing one SQL
statement per item, it groups multiple statements together and executes them in a single database operation.
package org.acme.batch.components.jdbc;
import javax.sql.DataSource;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import io.quarkiverse.jberet.components.runtime.item.jdbc.JdbcBatchItemWriter;
@Singleton
public class AuctionJdbcBatchItemWriterProducer {
@Inject
DataSource dataSource;
@Inject
AuctionStatisticsParameterSetter parameterSetter;
@Produces
@Dependent
@Named("auctionsItemWriter")
public JdbcBatchItemWriter<AuctionStatistics> auctionsItemWriter() {
String sql = """
INSERT INTO AuctionStatistics (
id, itemId, quantity, bid, minBid, maxBid,
buyout, minBuyout, maxBuyout, avgBid, avgBuyout, timestamp
) VALUES (nextval('auction_statistics_id'), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""";
return new JdbcBatchItemWriter<>(dataSource, sql, parameterSetter);
}
}
The JdbcBatchItemWriter requires:
-
A
DataSourceto write the data -
A parameterized SQL statement to execute for each item to write
-
A
ParameterSetterto map objects into SQL parameters
ParameterSetter
The ParameterSetter is a functional interface that sets PreparedStatement parameters from a POJO:
package org.acme.batch.components.jdbc;
import java.sql.SQLException;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import io.quarkiverse.jberet.components.runtime.item.jdbc.ParameterSetter;
@Singleton
@Named
public class AuctionStatisticsParameterSetter implements ParameterSetter<AuctionStatistics> {
@Override
public void setValues(Parameters parameters, AuctionStatistics value) throws SQLException {
parameters.setInt(1, value.itemId());
parameters.setLong(2, value.quantity());
parameters.setLong(3, value.bid());
parameters.setLong(4, value.minBid());
parameters.setLong(5, value.maxBid());
parameters.setLong(6, value.buyout());
parameters.setLong(7, value.minBuyout());
parameters.setLong(8, value.maxBuyout());
parameters.setDouble(9, value.avgBid());
parameters.setDouble(10, value.avgBuyout());
parameters.setLong(11, System.currentTimeMillis());
}
}
The ParameterSetter extracts values from an object and sets them as PreparedStatement parameters by index in
Parameters.
The Job
All JDBC components must be assembled in a Job definition:
<?xml version="1.0" encoding="UTF-8"?>
<job id="auctionsJob" xmlns="https://jakarta.ee/xml/ns/jakartaee" version="2.0">
<step id="processAuctions">
<chunk>
<reader ref="auctionsItemReader"/> (1)
<writer ref="auctionsItemWriter"/> (2)
</chunk>
</step>
</job>
| 1 | The auctionsItemReader is the CDI bean name of the JdbcCursorItemReader produced by the AuctionJdbcCursorItemReaderProducer |
| 2 | The auctionsItemWriter is the CDI bean name of the JdbcBatchItemWriter produced by the AuctionJdbcBatchItemWriterProducer |
To execute this Job:
@Inject
JobOperator jobOperator;
void execute() {
long executionId = jobOperator.start("auctionsJob", new Properties());
}
Configuration with Batch Properties
Instead of using CDI producers, the JdbcCursorItemReader and JdbcBatchItemWriter and can be configured directly
in the Job XML using batch properties and their built-in reference names jdbcItemReader and jdbcItemWriter:
<?xml version="1.0" encoding="UTF-8"?>
<job id="auctionsJob" xmlns="https://jakarta.ee/xml/ns/jakartaee" version="2.0">
<step id="processAuctions">
<chunk item-count="100">
<reader ref="jdbcItemReader"> (1)
<properties>
(2)
<property name="sql"
value="SELECT itemId, sum(quantity), sum(bid), sum(buyout), min(bid / quantity), min(buyout / quantity), max(bid / quantity), max(buyout / quantity) FROM Auctions GROUP BY itemId ORDER BY itemId" />
(3)
<property name="rowMapper" value="auctionStatisticsRowMapper" />
</properties>
</reader>
<writer ref="jdbcItemWriter"> (4)
<properties>
(5)
<property name="sql"
value="INSERT INTO AuctionStatistics (id, itemId, quantity, bid, minBid, maxBid, buyout, minBuyout, maxBuyout, avgBid, avgBuyout, timestamp) VALUES (nextval('auction_statistics_id'), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" />
(6)
<property name="parameterSetter" value="auctionStatisticsParameterSetter" />
</properties>
</writer>
</chunk>
</step>
</job>
| 1 | Reference the built-in jdbcItemReader JdbcCursorItemReader |
| 2 | Specify the SQL query to execute to retrieve the data |
| 3 | Specify the CDI bean name of the RowMapper |
| 4 | Reference the built-in jdbcItemWriter JdbcBatchItemWriter |
| 5 | Specify SQL statement to execute for each item to write |
| 6 | Specify the CDI bean name of the ParameterSetter |
|
When using batch properties, the |
Additionally, the following properties can be configured for the jdbcItemReader:
- fetchSize (optional)
-
Hints to the JDBC driver how many rows to fetch from the database:
<property name="fetchSize" value="1"/>
Flat File
The Flat File components provide reading and writing of flat files (CSV or any line-based format) using custom line mapping and formatting. These components are ideal for:
-
Importing data from CSV or other delimited files
-
Exporting batch processing results to flat files
-
Transforming between file formats
-
Processing large files line by line without loading the entire file into memory
FlatFileItemReader
The FlatFileItemReader reads data from a flat file, mapping each line into an object using a LineMapper. The
resource is resolved in order as a file path or a classpath resource.
package org.acme.batch.components.file;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import io.quarkiverse.jberet.components.runtime.item.file.FlatFileItemReader;
@Singleton
public class AuctionFlatFileItemReaderProducer {
@Inject
AuctionLineMapper lineMapper;
@Produces
@Dependent
@Named("auctionsItemReader")
public FlatFileItemReader<Auction> auctionsItemReader() {
return new FlatFileItemReader<>("auctions.csv", lineMapper)
.setLinesToSkip(1);
}
}
The FlatFileItemReader requires:
-
A resource to read (file path or classpath resource)
-
A
LineMapperto convert each line into a custom POJO
LineMapper
The LineMapper is a functional interface that maps a line of text to a POJO:
package org.acme.batch.components.file;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import io.quarkiverse.jberet.components.runtime.item.file.LineMapper;
@Singleton
@Named
public class AuctionLineMapper implements LineMapper<Auction> {
@Override
public Auction mapLine(String line, int lineNumber) {
String[] parts = line.split(",");
return new Auction(
Long.parseLong(parts[0].trim()),
Integer.parseInt(parts[1].trim()),
Long.parseLong(parts[2].trim()),
Long.parseLong(parts[3].trim()),
Integer.parseInt(parts[4].trim()));
}
}
package org.acme.batch.components.file;
public record Auction(long id, int itemId, long bid, long buyout, int quantity) {
}
The LineMapper receives the line content and the line number (starting at 1) and constructs the Auction object.
FlatFileItemWriter
The FlatFileItemWriter writes data to a flat file, formatting each item into a line using a LineFormatter.
package org.acme.batch.components.file;
import java.nio.file.Path;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import io.quarkiverse.jberet.components.runtime.item.file.FlatFileItemWriter;
@Singleton
public class AuctionFlatFileItemWriterProducer {
@Inject
AuctionLineFormatter lineFormatter;
@Produces
@Dependent
@Named("auctionsItemWriter")
public FlatFileItemWriter<Auction> auctionsItemWriter() {
return new FlatFileItemWriter<>(Path.of("/tmp/auctions-output.csv"), lineFormatter);
}
}
The FlatFileItemWriter requires:
-
A
Pathto the file to write -
A
LineFormatterto format each object into a line
LineFormatter
The LineFormatter is a functional interface that formats a POJO into a line of text:
package org.acme.batch.components.file;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import io.quarkiverse.jberet.components.runtime.item.file.LineFormatter;
@Singleton
@Named
public class AuctionLineFormatter implements LineFormatter<Auction> {
@Override
public String formatLine(Auction auction) {
return auction.id() + "," + auction.itemId() + "," + auction.bid() + "," + auction.buyout() + ","
+ auction.quantity();
}
}
The LineFormatter extracts values from an object and returns a String representation.
The Job
All Flat File components must be assembled in a Job definition:
<?xml version="1.0" encoding="UTF-8"?>
<job id="auctionsJob" xmlns="https://jakarta.ee/xml/ns/jakartaee" version="2.0">
<step id="processAuctions">
<chunk>
<reader ref="auctionsItemReader"/> (1)
<writer ref="auctionsItemWriter"/> (2)
</chunk>
</step>
</job>
| 1 | The auctionsItemReader is the CDI bean name of the FlatFileItemReader produced by the AuctionFlatFileItemReaderProducer |
| 2 | The auctionsItemWriter is the CDI bean name of the FlatFileItemWriter produced by the AuctionFlatFileItemWriterProducer |
To execute this Job:
@Inject
JobOperator jobOperator;
void execute() {
long executionId = jobOperator.start("auctionsJob", new Properties());
}
Configuration with Batch Properties
Instead of using CDI producers, the FlatFileItemReader and FlatFileItemWriter can be configured directly
in the Job XML using batch properties and their built-in reference names flatFileItemReader and flatFileItemWriter:
<?xml version="1.0" encoding="UTF-8"?>
<job id="auctionsJob" xmlns="https://jakarta.ee/xml/ns/jakartaee" version="2.0">
<step id="processAuctions">
<chunk item-count="100">
<reader ref="flatFileItemReader"> (1)
<properties>
<property name="resource" value="auctions.csv" /> (2)
<property name="lineMapper" value="auctionLineMapper" /> (3)
</properties>
</reader>
<writer ref="flatFileItemWriter"> (4)
<properties>
<property name="resource" value="/tmp/auctions-output.csv" /> (5)
<property name="lineFormatter" value="auctionLineFormatter" /> (6)
</properties>
</writer>
</chunk>
</step>
</job>
| 1 | Reference the built-in flatFileItemReader FlatFileItemReader |
| 2 | Specify the resource to read (file path or classpath resource) |
| 3 | Specify the CDI bean name of the LineMapper |
| 4 | Reference the built-in flatFileItemWriter FlatFileItemWriter |
| 5 | Specify the file path to write |
| 6 | Specify the CDI bean name of the LineFormatter |
Additionally, the following properties can be configured:
For flatFileItemReader:
- encoding (optional)
-
The file encoding to use when reading the file. Defaults to
UTF-8:
<property name="encoding" value="ISO-8859-1"/>
- linesToSkip (optional)
-
The number of header lines to skip at the beginning of the file. Defaults to
0:
<property name="linesToSkip" value="1"/>
For flatFileItemWriter:
- encoding (optional)
-
The file encoding to use when writing the file. Defaults to
UTF-8:
<property name="encoding" value="ISO-8859-1"/>
- lineSeparator (optional)
-
The line separator to use. Defaults to the system line separator:
<property name="lineSeparator" value="\n"/>
- append (optional)
-
Whether to append to the file instead of overwriting it. Defaults to
false:
<property name="append" value="true"/>
MongoDB
The MongoDB components provide efficient reading and writing of documents using the MongoDB Java Driver with full transaction support. These components are ideal for:
-
Processing large MongoDB collections without loading all documents into memory
-
Migrating data between MongoDB databases
-
Transforming and enriching MongoDB documents
-
Integrating MongoDB with other data sources in batch jobs
|
MongoDB components support requires the Quarkus MongoDB Client extension. |
To use the MongoDB components, add the Mongo Client extension to your build file:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mongodb-client</artifactId>
</dependency>
implementation("io.quarkus:quarkus-mongodb-client")
|
It also requires the JBeret Component dependency. See Installation. |
MongoCursorItemReader
The MongoCursorItemReader reads documents from a MongoDB collection using a cursor, which streams documents one at a
time without loading the entire collection into memory.
package org.acme.batch.components.mongo;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Sorts;
import io.quarkiverse.jberet.components.runtime.item.mongo.MongoCursorItemReader;
@Singleton
public class AuctionMongoCursorItemReaderProducer {
@Inject
MongoClient mongoClient;
@Produces
@Dependent
@Named("auctionsMongoItemReader")
public MongoCursorItemReader<Auction> auctionsMongoItemReader() {
return new MongoCursorItemReader<>(mongoClient, "auctions", "auctions", Auction.class)
.setFilter(Filters.gt("buyout", 40000))
.setSort(Sorts.ascending("itemId"))
.setLimit(1000);
}
}
The MongoCursorItemReader requires:
-
A
MongoClientto connect to MongoDB -
A database name
-
A collection name
-
The document class type
The reader supports MongoDB query operations:
-
filter - Query filter using
com.mongodb.client.model.Filters -
projection - Field projection using
com.mongodb.client.model.Projections -
sort - Sort order using
com.mongodb.client.model.Sorts -
hint - Index hint for query optimization
-
limit - Maximum number of documents to read
-
skip - Number of documents to skip
-
maxTime - Maximum execution time
-
batchSize - Number of documents to fetch per batch
MongoItemWriter
The MongoItemWriter writes documents to a MongoDB collection with full transaction support. The writer integrates
with JTA transactions, ensuring that MongoDB writes are committed or rolled back together with the batch job’s
transaction boundaries.
package org.acme.batch.components.mongo;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import com.mongodb.client.MongoClient;
import io.quarkiverse.jberet.components.runtime.item.mongo.MongoItemWriter;
@Singleton
public class AuctionMongoItemWriterProducer {
@Inject
MongoClient mongoClient;
@Produces
@Dependent
@Named("auctionsMongoItemWriter")
public MongoItemWriter<Auction> auctionsMongoItemWriter() {
return new MongoItemWriter<>(mongoClient, "auctions", "auctions", Auction.class);
}
}
The MongoItemWriter requires:
-
A
MongoClientto connect to MongoDB -
A database name
-
A collection name
-
The document class type
The Job
All MongoDB components must be assembled in a Job definition:
<?xml version="1.0" encoding="UTF-8"?>
<job id="mongoJob" xmlns="https://jakarta.ee/xml/ns/jakartaee" version="2.0">
<step id="processAuctions">
<chunk item-count="100">
<reader ref="auctionsMongoItemReader"/> (1)
<writer ref="auctionsMongoItemWriter"/> (2)
</chunk>
</step>
</job>
| 1 | The auctionsMongoItemReader is the CDI bean name of the MongoCursorItemReader produced by the AuctionMongoCursorItemReaderProducer |
| 2 | The auctionsMongoItemWriter is the CDI bean name of the MongoItemWriter produced by the AuctionMongoItemWriterProducer |
To execute this Job:
@Inject
JobOperator jobOperator;
void execute() {
long executionId = jobOperator.start("mongoJob", new Properties());
}
Configuration with Batch Properties
Instead of using CDI producers, the MongoCursorItemReader and MongoItemWriter and can be configured directly
in the Job XML using batch properties and their built-in reference names mongoItemReader and mongoItemWriter:
<?xml version="1.0" encoding="UTF-8"?>
<job id="mongoJob" xmlns="https://jakarta.ee/xml/ns/jakartaee" version="2.0">
<step id="processAuctions">
<chunk item-count="100">
<reader ref="mongoItemReader"> (1)
<properties>
<property name="database" value="auctions"/> (2)
<property name="collection" value="auctions"/> (3)
<property name="itemType" value="io.example.Auction"/> (4)
<property name="filter" value="{'buyout': {'$gt': 40000}}"/> (5)
<property name="sort" value="{'itemId': 1}"/> (6)
<property name="limit" value="1000"/>
</properties>
</reader>
<writer ref="mongoItemWriter"> (7)
<properties>
<property name="database" value="auctions"/>
<property name="collection" value="auctions"/>
<property name="itemType" value="io.example.Auction"/>
</properties>
</writer>
</chunk>
</step>
</job>
| 1 | Reference the built-in mongoItemReader MongoCursorItemReader |
| 2 | Specify the MongoDB database name |
| 3 | Specify the MongoDB collection name |
| 4 | Specify the fully qualified class name of the document type |
| 5 | Specify the query filter as a JSON string |
| 6 | Specify the sort order as a JSON string |
| 7 | Reference the built-in mongoItemWriter MongoItemWriter |
|
When using batch properties, the |
Additionally, the following properties can be configured for the mongoItemReader:
- filter (optional)
-
The query filter to apply to the query, specified as a MongoDB JSON string using MongoDB Extended JSON format.
<property name="filter" value="{'buyout': {'$gt': 40000}}"/>
- projection (optional)
-
A document describing the fields to return for all matching documents, specified as a JSON object. Use
1to include a field and0to exclude it.
<property name="projection" value="{'itemId': 1, 'bid': 1, 'buyout': 1}"/>
- sort (optional)
-
The sort criteria to apply to the query, specified as a JSON object. Use
1for ascending order and-1for descending order.
<property name="sort" value="{'itemId': 1}"/>
- hint (optional)
-
The hint for which index to use, specified as a JSON object. Use this to optimize query performance on large collections.
<property name="hint" value="{'buyout': 1}"/>
- limit (optional)
-
The maximum number of documents to read. Use
0or omit this property for no limit.
<property name="limit" value="1000"/>
- skip (optional)
-
The number of documents to skip before reading. Useful for pagination or resuming from a specific point.
<property name="skip" value="100"/>
- maxTime (optional)
-
The maximum execution time on the server for this operation, specified as an ISO-8601 duration string (e.g.,
PT30Sfor 30 seconds).
<property name="maxTime" value="PT1M"/> <!-- 1 minute -->
- batchSize (optional)
-
The number of documents to return per batch from the MongoDB server.
<property name="batchSize" value="100"/>
Repositories
JPARepository
The JPA Repository stores batch job metadata (job instances, executions, and step executions) using JPA entities and Hibernate ORM. This provides broader database support through JPA and can leverage first or second level caches for improved performance.
To use the JPA Repository, add the Hibernate ORM extension to your build file:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-hibernate-orm</artifactId>
</dependency>
implementation("io.quarkus:quarkus-hibernate-orm")
|
It also requires the JBeret Component dependency. See Installation. |
Configuration
To use the JPA Repository, set the repository type to jpa and configure a datasource:
quarkus.datasource.db-kind=h2
quarkus.datasource.jdbc.url=jdbc:h2:mem:test
quarkus.jberet.repository.type=jpa
|
The JPA Repository uses the default (unnamed) persistence unit by default. |
For applications with multiple persistence units, specify which persistence unit to use for JBeret entities:
quarkus.datasource."batch".db-kind=postgresql
quarkus.datasource."batch".username=<your username>
quarkus.datasource."batch".password=<your password>
quarkus.datasource."batch".jdbc.url=jdbc:postgresql://localhost:5432/batch
quarkus.hibernate-orm."batch".datasource=batch
quarkus.jberet.repository.type=jpa
quarkus.jberet.repository.jpa.persistence-unit-name=batch
|
The JBeret JPA entities are automatically registered with the specified persistence unit. |
For more information, please check Configuring a JobRepository
Configuration property fixed at build time - All other configuration properties are overridable at runtime
Configuration property |
Type |
Default |
|---|---|---|
The Persistence Unit Name for JBeret entities. By default, it uses the default Persistence Unit Name from the Hibernate ORM Extension. Environment variable: |
string |
<default> |