Quarkus Fluss
A Quarkus extension providing a SmallRye Reactive Messaging connector for Apache Fluss (Incubating), a streaming storage system built for real-time analytics.
Use standard @Incoming and @Outgoing annotations to consume from and produce to Fluss Log Tables.
Prerequisites
-
Java 17+
-
Maven 3.9+
-
A running Apache Fluss cluster (default:
localhost:9123)
The Fluss client uses Apache Arrow internally, which requires --add-opens=java.base/java.nio=ALL-UNNAMED at JVM startup. Add this to your Quarkus Maven plugin configuration.
|
Installation
If you want to use this extension, you need to add the io.quarkiverse.fluss:quarkus-messaging-fluss extension first to your build file.
For instance, with Maven, add the following dependency to your POM file:
<dependency>
<groupId>io.quarkiverse.fluss</groupId>
<artifactId>quarkus-messaging-fluss</artifactId>
<version>0</version>
</dependency>
And configure the JVM flag required by the Fluss client:
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<configuration>
<jvmArgs>--add-opens=java.base/java.nio=ALL-UNNAMED</jvmArgs>
</configuration>
</plugin>
Configuration
Configure channels in your application.properties using the smallrye-fluss connector name.
Incoming (consuming from Fluss)
mp.messaging.incoming.my-channel.connector=smallrye-fluss
mp.messaging.incoming.my-channel.bootstrap-servers=localhost:9123
mp.messaging.incoming.my-channel.database=my_db
mp.messaging.incoming.my-channel.table=events
Outgoing (producing to Fluss)
mp.messaging.outgoing.my-channel.connector=smallrye-fluss
mp.messaging.outgoing.my-channel.bootstrap-servers=localhost:9123
mp.messaging.outgoing.my-channel.database=my_db
mp.messaging.outgoing.my-channel.table=events
Configuration Reference
| Property | Type | Default | Direction | Description |
|---|---|---|---|---|
|
String |
Both |
Must be |
|
|
String |
|
Both |
Fluss cluster bootstrap address |
|
String |
|
Both |
Fluss database name |
|
String |
(required) |
Both |
Fluss table name |
|
String |
|
Incoming |
Startup mode: |
|
long |
Incoming |
Epoch millis, required when |
|
|
String |
Incoming |
Comma-separated column names for projection |
|
|
int |
|
Incoming |
Poll timeout in milliseconds |
|
int |
|
Outgoing |
Number of records before flushing |
Usage
Consuming messages
Incoming messages carry an InternalRow payload representing a row from a Fluss table.
@ApplicationScoped
public class FlussConsumer {
@Incoming("events")
public void consume(InternalRow row) {
String id = row.getString(0).toString();
int value = row.getInt(1);
System.out.println("Received: id=" + id + ", value=" + value);
}
}
Accessing Fluss metadata
Each message includes FlussMessageMetadata with the table path, bucket, offset and change type.
@ApplicationScoped
public class FlussConsumerWithMetadata {
@Incoming("events")
public CompletionStage<Void> consume(Message<InternalRow> message) {
message.getMetadata(FlussMessageMetadata.class).ifPresent(meta -> {
System.out.println("Table: " + meta.getTablePath());
System.out.println("Bucket: " + meta.getBucketId());
System.out.println("Offset: " + meta.getOffset());
System.out.println("ChangeType: " + meta.getChangeType());
});
return message.ack();
}
}
Producing messages
Outgoing messages must carry a GenericRow (or any InternalRow) payload matching the target table schema.
@ApplicationScoped
public class FlussProducer {
@Outgoing("output")
public Multi<GenericRow> produce() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(tick -> {
GenericRow row = new GenericRow(3);
row.setField(0, BinaryString.fromString("event-" + tick));
row.setField(1, tick.intValue());
row.setField(2, System.currentTimeMillis());
return row;
});
}
}
Start reading position
The offset property controls where the connector begins reading.
| Mode | Description |
|---|---|
|
Default. Reads from the earliest offset. For PK tables, reads from the earliest changelog offset. |
|
Starts reading from the earliest available offset. |
|
Starts reading from the latest offset — only new records arriving after startup will be consumed. |
|
Starts reading from the offset closest to a given timestamp. Requires |
# Read only new records
mp.messaging.incoming.events.offset=latest
# Read from a specific point in time
mp.messaging.incoming.events.offset=timestamp
mp.messaging.incoming.events.offset.timestamp=1713200000000
Current Limitations
-
No full snapshot read for PK Tables — the
fulloffset mode does not yet read the initial snapshot for Primary Key Tables; it falls back toearliest(changelog only) -
No consumer offset tracking — offsets are not persisted between restarts
-
No health checks — MicroProfile Health integration is not yet implemented
-
No dev services — no automatic Fluss container startup in dev mode