Quarkus Fluss

A Quarkus extension providing a SmallRye Reactive Messaging connector for Apache Fluss (Incubating), a streaming storage system built for real-time analytics.

Use standard @Incoming and @Outgoing annotations to consume from and produce to Fluss Log Tables.

Prerequisites

  • Java 17+

  • Maven 3.9+

  • A running Apache Fluss cluster (default: localhost:9123)

The Fluss client uses Apache Arrow internally, which requires --add-opens=java.base/java.nio=ALL-UNNAMED at JVM startup. Add this to your Quarkus Maven plugin configuration.

Installation

If you want to use this extension, you need to add the io.quarkiverse.fluss:quarkus-messaging-fluss extension first to your build file.

For instance, with Maven, add the following dependency to your POM file:

<dependency>
    <groupId>io.quarkiverse.fluss</groupId>
    <artifactId>quarkus-messaging-fluss</artifactId>
    <version>0</version>
</dependency>

And configure the JVM flag required by the Fluss client:

<plugin>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-maven-plugin</artifactId>
    <configuration>
        <jvmArgs>--add-opens=java.base/java.nio=ALL-UNNAMED</jvmArgs>
    </configuration>
</plugin>

Configuration

Configure channels in your application.properties using the smallrye-fluss connector name.

Incoming (consuming from Fluss)

mp.messaging.incoming.my-channel.connector=smallrye-fluss
mp.messaging.incoming.my-channel.bootstrap-servers=localhost:9123
mp.messaging.incoming.my-channel.database=my_db
mp.messaging.incoming.my-channel.table=events

Outgoing (producing to Fluss)

mp.messaging.outgoing.my-channel.connector=smallrye-fluss
mp.messaging.outgoing.my-channel.bootstrap-servers=localhost:9123
mp.messaging.outgoing.my-channel.database=my_db
mp.messaging.outgoing.my-channel.table=events

Configuration Reference

Property Type Default Direction Description

connector

String

Both

Must be smallrye-fluss

bootstrap-servers

String

localhost:9123

Both

Fluss cluster bootstrap address

database

String

fluss

Both

Fluss database name

table

String

(required)

Both

Fluss table name

offset

String

full

Incoming

Startup mode: full, earliest, latest, timestamp

offset.timestamp

long

Incoming

Epoch millis, required when offset=timestamp

columns

String

Incoming

Comma-separated column names for projection

poll-timeout

int

100

Incoming

Poll timeout in milliseconds

batch-size

int

100

Outgoing

Number of records before flushing

Usage

Consuming messages

Incoming messages carry an InternalRow payload representing a row from a Fluss table.

@ApplicationScoped
public class FlussConsumer {

    @Incoming("events")
    public void consume(InternalRow row) {
        String id    = row.getString(0).toString();
        int    value = row.getInt(1);
        System.out.println("Received: id=" + id + ", value=" + value);
    }
}

Accessing Fluss metadata

Each message includes FlussMessageMetadata with the table path, bucket, offset and change type.

@ApplicationScoped
public class FlussConsumerWithMetadata {

    @Incoming("events")
    public CompletionStage<Void> consume(Message<InternalRow> message) {
        message.getMetadata(FlussMessageMetadata.class).ifPresent(meta -> {
            System.out.println("Table: " + meta.getTablePath());
            System.out.println("Bucket: " + meta.getBucketId());
            System.out.println("Offset: " + meta.getOffset());
            System.out.println("ChangeType: " + meta.getChangeType());
        });
        return message.ack();
    }
}

Producing messages

Outgoing messages must carry a GenericRow (or any InternalRow) payload matching the target table schema.

@ApplicationScoped
public class FlussProducer {

    @Outgoing("output")
    public Multi<GenericRow> produce() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(tick -> {
                    GenericRow row = new GenericRow(3);
                    row.setField(0, BinaryString.fromString("event-" + tick));
                    row.setField(1, tick.intValue());
                    row.setField(2, System.currentTimeMillis());
                    return row;
                });
    }
}

Start reading position

The offset property controls where the connector begins reading.

Mode Description

full

Default. Reads from the earliest offset. For PK tables, reads from the earliest changelog offset.

earliest

Starts reading from the earliest available offset.

latest

Starts reading from the latest offset — only new records arriving after startup will be consumed.

timestamp

Starts reading from the offset closest to a given timestamp. Requires offset.timestamp (epoch milliseconds).

# Read only new records
mp.messaging.incoming.events.offset=latest

# Read from a specific point in time
mp.messaging.incoming.events.offset=timestamp
mp.messaging.incoming.events.offset.timestamp=1713200000000

Column projection

Reduce network overhead by fetching only the columns you need:

mp.messaging.incoming.events.columns=sensor_id,temperature,timestamp

The InternalRow you receive will only contain the projected columns (indexed starting at 0 in projection order).

Current Limitations

  • No full snapshot read for PK Tables — the full offset mode does not yet read the initial snapshot for Primary Key Tables; it falls back to earliest (changelog only)

  • No consumer offset tracking — offsets are not persisted between restarts

  • No health checks — MicroProfile Health integration is not yet implemented

  • No dev services — no automatic Fluss container startup in dev mode

Extension Configuration Reference