Kafka Streams Processor

This extension eases the implementation of event-driven architecture streaming microservices based on Kafka Streams’ Processor API. It avoids Kafka Streams' users a lot of boilerplate code to have a dedicated topology and a proper production readiness (health, observability, error handling). All that is left to develop is a simple Processor class. The extension takes care of the rest.

Streaming processor

A streaming microservice has some requirements:

  • Kafka Streams processors are not thread-safe, so a processor scope is @jakarta.enterprise.context.Dependent by default.

  • An incoming message, from an input topic, can be acknowledged only when it has been processed and its result message has been produced and stored in a Kafka output topic.

  • If an incoming message could not be acknowledged (e.g: a microservice crash) then the incoming message will be redelivered (at-least-once guarantee).

  • Incoming messages are processed in the order of arrival (ordering guarantee)

There are currently several bootstraps or libraries that support writing stream processor-based applications on different broker technologies. This extension’s processor solution is based on Kafka Streams' Processor API.

Kafka Streams

Kafka Streams is an opensource client library that allows to write streaming applications.

It is enabled by the quarkus-kafka-streams extension:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-kafka-streams</artifactId>
    <scope>runtime</scope>
</dependency>

Only one threading model is supported: 1 thread per topic-partition (see details)

The number of partitions on the source topic determines up to how many threads can consume in parallel.

Synchronous/blocking programing must be used to ensure guarantee of execution and ordering.

There is a proposal under discussion for adding asynchronous processing to Kafka Streams.

A simplified Processor API

On top of Kafka Streams, the quarkus-kafka-streams-processor extension adds features to help you to write streaming microservices.

The following dependency needs to be added:

<project>
    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
          <artifactId>quarkus-kafka-streams-processor-bom</artifactId>
          <version>${quarkus.kafkastreamsprocessor.version}</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
        <artifactId>quarkus-kafka-streams-processor-api</artifactId>
      </dependency>
      <dependency>
        <groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
        <artifactId>quarkus-kafka-streams-processor</artifactId>
        <scope>runtime</scope>
      </dependency>
    </dependencies>
</project>

With the extension, it is expected to define an annotated Kafka Streams' Processor. It is discovered as a CDI bean and instrumented by the Quarkus runtime. The difference here with the fully fledged Kafka Streams Processor API is that there is no need to build a custom Topology for each new microservice. It is built by the Quarkus runtime.

Example:

PingProcessor.java
@Slf4j
@Processor (1)
public class PingProcessor extends ContextualProcessor<String, Ping, String, Ping> { (2)

  @Override
  public void process(Record<String, Ping> record) { (3)
    resultMessage = countChars(record.value().getMessage());
    Ping pong = Ping.newBuilder().setMessage(resultMessage).build();
    context().forward(record.withValue(pong)); (4)
  }

  /**
   * Counts the nb of characters and returns it as string.
   */
  private String countChars(String input) {
    return String.valueOf(input.length());
  }
}
1 Processors are beans managed by CDI, and as such need to be annotated with @io.quarkiverse.kafkastreamsprocessor.api.Processor so that any other CDI dependency may be injected. Kafka Streams' processors are not thread-safe, so a processor scope is @jakarta.enterprise.context.Dependent by default.
2 Directly extend org.apache.kafka.streams.processor.api.ContextualProcessor, specifying the type of keys and values. For keys, only type String is supported. For values, see supported types.
3 The process method is called for each incoming message. Any processing must be done synchronously.
4 The forward method allows to produce an outgoing message. Internally Kafka Streams links the acknowledgment on the outgoing message with the commit of the associated incoming message.

Topology builder

In short (see core concepts), in Kafka Streams a Topology is made of:

  • Sources: 1 or more input topics

  • Processors: a graph of 1 or more record handlers

  • Stores (optional): for stateful use cases, the processor must be assigned one or multiple stores to keep the state.

  • Sinks: 1 or more output topics

The extension will automatically build the following topology:

  • Scan for a CDI bean of type org.apache.kafka.streams.processor.api.Processor<KIn, VIn, KOut, VOut> (single processor). The deprecated type org.apache.kafka.streams.processor.Processor<K, V> is also supported for backward-compatibility.

  • Register a source for a single topic in kafkastreamsprocessor.input.topic, String key serializer and Protobuf value deserializer based on the declared type <V> of the processor. If multiple topics are declared the configuration should be kafkastreamsprocessor.input.topics. Also, to manipulate topics coming from different sources the property should include the source name in the declaration as kafkastreamsprocessor.input.sources.<sources>.topic.

  • Register as many sinks as topics listed in properties matching kafkastreamsprocessor.output.sinks.<sink>.topic, String key serializer and Protobuf value serializer. Sink name is taken from the properties name. If no kafkastreamsprocessor.output.sinks.<sink>.topic properties are defined and a single topic is defined in kafkastreamsprocessor.output.topic, a default sink is used.

Configuration

Please consider two kind of properties:

First class citizen properties

First class citizen properties are a set of properties prefixed with quarkus.kafka-streams. They are consumed by the Quarkus kafka-streams extension itself (used at build time and overridable, for some of them, at runtime).

In Kafka Streams Processor the following configuration keys are required:

All the properties used in the extension are recapped here

Pass-through properties

Due to the important number of properties that can be set to configure a Kafka Streams application, it’s impossible for Quarkus extension to know them all.

But it’s possible to use pass-through properties in the configuration of your application, using the name of property but prefixed with kafka-streams.

# First citizen property
quarkus.kafka-streams.bootstrap-servers=localhost:9092

# Pass-through properties
kafka-streams.producer.linger.ms=0
kafka-streams.compression.type=gzip

Override pass-through properties

Overriding pass-through properties at runtime is a little bit hacky because you need to prefix the property with quarkus. Then, you can pass it as a Java system property.

Passing the property as an environment variable with Microprofile Config convention IS NOT working.

Fo example, the possible way to override a pass-through property in a Kafka Streams Processor microservice would be to use the corresponding system property in the JVM commandline:

-Dquarkus.kafka-streams.producer.linger.ms=50

Data serialization

Data serialization refers to the process of converting complex data structures or objects into a format that can be easily transmitted over a network. The extension’s processors support different serialization formats. By analyzing the types of declared input and output payloads of the processor, it’s possible to infer the type of data to serialize and set up the correct serializer and deserializer.

As of today, the extension’s processors support three first-class citizen formats and one customization feature:

JSON is the default serialization format. If the payload cannot be serialized to JSON, an error is thrown. Depending on the Global DLQ Error management the message will be produced in the DLQ, and the incoming message will be acknowledged in the input topic.

JSON

JSON is a first class citizen serialization format, the SDK will use JSON as default serialization format.

Example

In this example, we implement a processor working with a POJO as value type. The POJO is read from kafka, or written to kafka, with the POJO’s JSON textual representation.

PojoProcessor.java
@Slf4j
@Processor (1)
public class PojoProcessor extends ContextualProcessor<String, SamplePojo, String, SamplePojo> { (2)
    @Override
    public void process(Record<String, SamplePojo> record) { (3)
        String reversedMsg = new StringBuilder(record.value().getStringField()).reverse().toString();
        log.info("Received value {} sending back {} in response", record.value().getStringField(), reversedMsg);
        SamplePojo pojo = new SamplePojo(reversedMsg, record.value().getNumericalField() + 37,
                !record.value().getBooleanField());
        context().forward(record.withValue(pojo)); (4)
    }
}
1 Your Processor is declared with the annotation as for a regular processor.
2 The handled value type, is a class that can be serialized in JSON.
3 Same POJO value in the process() method.
4 Similarly, a POJO value can be sent down to the output topic.

Configuring JSON serialization

In order to customize the JSON serialization, the SDK relies on the ObjectMapper customization from Quarkus

Serialization to Protobuf and its caveats

Protobuf values and String are the first class citizen data types in Kafka Streams Processor messaging.

To ease development with Protobuf, various libraries support code generation from .proto files or OpenAPI descriptors, for instance openapi-generator.

The main problem with this approach is that certain details of the serialization logic are implementation-dependent and may have impacts on the usability of the generated APIs.

Null values

One such case in the handling of optional elements, i.e. how to detect in business code if an element was absent, or present but with an empty value.

There are multiple solutions to this problem, e.g.:

  • Use built-in primitive types

  • Use custom Nullable types instead of primitives

  • Use built-in wrapper types instead of primitives

  • Use optional in proto3

Serializer & Deserializer Customization

API

The SDK provides the interface io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer which allows you to specify the serializer and deserializer you need. You need to declare a bean implementing that interface.

It is available through the following dependency:

pom.xml
<dependency>
  <groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
  <artifactId>quarkus-kafka-streams-processor-api</artifactId>
</dependency>
Multiple customizers can be defined, and their execution order controlled through @Priority annotations.
Example

In this example, we implement a processor working with a POJO as value type. The POJO is read from kafka, or written to kafka, with a custom serialization format provided by sink and source serializer/deserializer that need to be known at build time.

PojoProcessor.java
@Slf4j
@Processor (1)
public class PojoProcessor extends ContextualProcessor<String, SamplePojo, String, SamplePojo> { (2)
    @Override
    public void process(Record<String, SamplePojo> record) { (3)
        String reversedMsg = new StringBuilder(record.value().getStringField()).reverse().toString();
        log.info("Received value {} sending back {} in response", record.value().getStringField(), reversedMsg);
        SamplePojo pojo = new SamplePojo(reversedMsg, record.value().getNumericalField() + 37,
                !record.value().getBooleanField());
        context().forward(record.withValue(pojo)); (4)
    }
}
1 Your Processor is declared with the annotation as for a regular processor.
2 The handled value type, in this example, is a simple POJO, nothing fancy.
3 Same POJO value in the process() method.
4 Similarly, a POJO value can be sent down to the output topic.

Ser/Des Customization

SampleConfigurationCustomizer.java
@Dependent
public class SampleConfigurationCustomizer implements ConfigurationCustomizer {
  @Override
  public void fillConfiguration(Configuration configuration) {
      configuration.setSinkValueSerializer(new MyCustomSerializer<SamplePojo>());
    configuration.setSourceValueSerde(new MyCustomSerde(SamplePojo));
  }
}

The io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer implementation provided sets the required Serializer and Deserializer into the method’s io.quarkiverse.kafkastreamsprocessor.api.configuration.Configuration object. In this example, we set a custom serialization through the usage of Kafka Serdes.

Error management

Strategy for error management

When an exception occurs while processing a record, different strategies have been defined:

  • "continue" (default) : drop the message and continue processing

  • "fail" (not implemented yet) : fail and stop processing more message

  • "dead-letter-queue" : send the message to the DLQ and continue processing

DLQ monitoring

Such exception occurs are counted by a custom metric of this extension. If the dead-letter-queue strategy is chosen, the messages produced to the DLQ topics are counted.

Dead-letter-queue strategy

In the case of a dead-letter-queue strategy, the DLQ topic is added to the readiness probe to ensure the ability to produce the poisonous message to this topic.

This local DLQ is meant only to store the poisonous message that the application could not consume successfully. For error happening at production time, one can use a global DLQ defined for this purpose.

Global Dead Letter Queue

When a microservice cannot produce to a downstream Kafka broker and that the error cannot be managed by the application, the message will be produced in a global DLQ.

The following value must be set in the application.properties file

application.properties
kafkastreamsprocessor.global-dlq.topic=<global-dlq-topic>

By default, the maximum size for the message is java.lang.Integer.MAX_VALUE for the global DLQ. It can be modified with:

application.properties
kafkastreamsprocessor.global-dlq.max-message-size=3000

All the properties used in the extension are recapped here

Error metadata

For tracking purpose following headers are added to the record:

  • dead-letter-reason: The reason of the failure if any (Exception message)

  • dead-letter-cause: The cause of the failure if any (Exception cause)

  • dead-letter-topic: The original topic of the record, in this case the output topic where production failed

  • dead-letter-partition: The original partition of the record

Retries

In order to improve resiliency and to avoid losing messages due to the temporary unavailability of an external system (database, HTTP endpoint, …​), the Kafka Streams Processor SDK has a retry logic for the exceptions raised during the process of the message.

The retry relies on the at-least-once guarantees of messaging: the processing must already support being called multiple times with the same message, and must make sure any external interaction is idempotent.

A RetryDecorator has been introduced inside the topology definition. Its responsibility is to retry indefinitely the @io.quarkiverse.kafkastreamsprocessor.api.Processor retryableProcess method when a io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException is raised.

Configuration

The default behavior can be overridden via the following configuration:

application.properties
kafkastreamsprocessor.retry.max-retries=-1
kafkastreamsprocessor.retry.retry-on=io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException

As this implementation is based on Eclipse Microprofile Fault Tolerance it’s also possible to override some default policy (i.e.: maxDuration, jitter…​) under the prefix kafkastreamsprocessor.retry..

Ad hoc usage

You can also use the Fault Tolerance @org.eclipse.microprofile.faulttolerance.Retry annotation for some other logic in your code.

PingerService.java
public class PingerService {
    @Retry (1)
    @Override
    public Response ping(String message) {
    }
}
1 Define the method to retry with org.eclipse.microprofile.faulttolerance.Retry annotation
application.properties
# Override max retry default value
io.quarkiverse.kafkastreamsprocessor.example.PingerService/Retry/maxRetries=3

Monitoring

Kafka metrics cardinality

Users have to pay attention to some specificities of Kafka Streams metrics: some metrics have labels whose values are dynamic. There’s a possible impact on Prometheus because this could drastically increase the amount of data needed to store the time series. To understand how IDs label are computed, one can read the following documentation on KafkaStreams runtime information

To mitigate the impact, it is a good practice to fix arbitrary prefix to dynamic labels but still, the variability cannot be removed entirely.

As an example, all Kafka metrics have in common the label client_id, the resulting computation of the label by the SDK is:

Example metric
kafka_consumer_fetch_manager_records_consumed_total{client_id="application-name-StreamThread-1-consumer",kafka_version="3.6.3",topic="ping-events",} 1.0

The metric client_id value is formed of :

  • quarkus-kafka-streams-processor-simple-sample which is the client id defined in kafka-streams.client.id config property

  • StreamThread-1-consumer which is computed by KafkaStreams and depends on the number of threads.

So it is a good practice to set kafka-streams.client.id with the name of the microservice.

quarkus-kafka-streams-processor metric list

This list includes the additional metrics, on top of the Kafka Streams and the general Kafka ones, that this extension instruments:

Table 1. Messaging metrics
Micrometer metric name Type Description

kafkastreamsprocessor.processor.errors

Counter

Total number of errors encountered during Kafka Streams message processing

Table 2. Punctuation metrics
Micrometer metric name Type Description

kafkastreamsprocessor.punctuation.errors

Counter

The number of times the execution of a Punctuator failed with an exception since the start of the microservice.

Table 3. Dead Letter Queue Metrics
Micrometer metric name Type Description

kafkastreamsprocessor.dlq.sent

Counter

The number of messages sent to DLQ. This metrics differs from the kafkastreamsprocessor.processor.errors in that the deserialization errors are taken into account. If no DLQ strategy is chosen, then this metric will be 0. Messages too large for the DLQ will be counted both in this metrics and in kafkastreamsprocessor.global.dlq.sent.

kafkastreamsprocessor.global.dlq.sent

Counter

The number of messages sent to global DLQ.

A comparison between Reactive Messaging Kafka and Kafka Streams

These two technologies can be used to create streaming microservices to be used in Event-Driven architecture applications.

As explained in the Streaming processor introduction, a streaming microservice has some requirements:

  • An incoming message, from an input topic, can be acknowledged only when it has been processed and its result message has been produced and stored in a Kafka output topic.

  • If an incoming message could not be acknowledged (e.g: a microservice crash) then the incoming message will be redelivered (at-least-once guarantee).

  • Incoming messages are processed in the order of arrival (ordering guarantee)

In the Kafka Streams bootstrap, the guarantee of ordering from the source topic has a direct impact on the message consumption and the scalability model. As explained in the Kafka Streams, the number of partitions of the source topic determines how many threads can consume in parallel.

In other terms, for an input topic, it’s not possible to process more that x messages in parallel where x is the number of partitions of the source topic.

Starting asynchronous processing in a Kafka Streams microservice would lead to loss of messages: acknowledgement of the incoming messages will be de-synchronized from the production of messages once outgoing messages have been processed.

Reactive messaging has a different architecture and threading model compared to Kafka Streams. But by default, when processing messages it comes with the same guarantees seen above.

However, it’s possible to alter the behaviour of the message processing with annotations to improve, in a drastic manner, the concurrency of a microservice. The purpose of increasing concurrency is to be able to cope with streaming microservice that need to call slow remote APIs, this is the bridging from event-driven to service-oriented architecture.

Concurrent processing in Reactive Messaging

  @Incoming("ping-events") (1)
  @Outgoing("pong-events") (2)
  @Blocking(ordered = false) (3)
  public Ping process(Ping ping) {
    return api.remoteCall();
  }
1 @Incoming is declaring this method as a subscriber for the channel named ping-events
2 @Outgoing is declaring this method as a producer for the channel named pong-events
3 @io.smallrye.reactive.messaging.annotations.Blocking Indicates that this method is running out of the processing thread, inside a worker thread and the order of the messages is not important. Note that once the execution is finished the result is handled back to the processing thread.

Guarantee of delivery in concurrent processing

In Kafka Streams, performing asynchronous calls would lead to the possibility of messages not being redelivered. What about the guarantee of delivery when processing concurrent messages in Reactive Messaging?

It’s up to the acknowledgement policy to commit incoming messages when it has been processed concurrently. The default strategy is throttled and guarantees at-least-once delivery

throttled keeps track of received messages and commits to the next offset after the latest acked message in sequence. This strategy guarantees at-least-once delivery even if the channel performs asynchronous processing.

Effects of throttled.unprocessed-record-max-age.ms on the liveness probe

When using the throttled strategy, the commit of the sequence of messages will happen only when all messages have been processed (ordered or not).

If the processing method does not return before reaching the throttled.unprocessed-record-max-age.ms timeout, the microservice will be considered as unhealthy and the liveness probe will fail and the microservice will be restarted. It means that, for example, if a remote API takes too long to respond for some queries, and its timeout is configured with a value above throttled.unprocessed-record-max-age.ms, then messages will be received again after the restart, acting like a poison pill.

Processors summary

Here’s a recap of the guarantees offered by the different way of processing messages.

Processor Guarantee of order At-least-once delivery Max concurrency

Default Kafka Streams

Yes

Yes

number of partitions

Kafka Streams asynchronous

No

No

number of threads

Default Reactive Messaging

Yes

Yes

number of partitions

Reactive Messaging with @Blocking

Yes

Yes

number of partitions

Reactive Messaging with @Blocking(order=false)

No

Yes

number of threads

Stateful EDA

Introduction

To support stateful requirements, Kafka Streams' Processor need to implement a State-Store. By default, no State-Store are linked to the io.quarkiverse.kafkastreamsprocessor.api.Processor but the application can override this configuration.

API

The SDK provides the interface io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer which allows you to specify the State-Store you need. You need to declare a bean implementing that interface.

It is available through the following dependency:

pom.xml
<dependency>
  <groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
  <artifactId>quarkus-kafka-streams-processor-api</artifactId>
</dependency>
Multiple customizers can be defined, and their execution order controlled through @Priority annotations.

Example

In this example, we implement a processor which is using a State-Store.

Processor

StateStoreProcessor.java
@Slf4j
@Processor (1)
public class PingProcessor extends ContextualProcessor<String, Ping, String, Ping> {
    private KeyValueStore<String, String> pingData;

    @Override
    public void init(ProcessorContext<String, Ping> context) {
        super.init(context);
        pingData = context.getStateStore("ping-data"); (2)
        context.schedule(Duration.ofMillis(1L), PunctuationType.STREAM_TIME, new DuplicateValuePunctuator(pingData));
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void process(Record<String, Ping> record) {
        log.info("Process the message: {}", record.value().getMessage());

        String previousValue = pingData.get(record.key());
        pingData.put(record.key(), record.value().getMessage());

        if (previousValue == null) {
            context().forward(
                    record.withValue(Ping.newBuilder().setMessage("Store initialization OK for " + record.key()).build()));
        } else {
            context().forward(record.withValue(
                    Ping.newBuilder().setMessage("Previous value for " + record.key() + " is " + previousValue).build()));
        }
    }
}
1 Your Processor is declared with the annotation as for a regular processor.
2 The definition and initialization of your state store.

StateStore Customization

SampleConfigurationCustomizer.java
@Dependent
public class SampleConfigurationCustomizer implements ConfigurationCustomizer {
    @Override
    public void fillConfiguration(Configuration configuration) {
        List<StoreConfiguration> storeConfigurations = new ArrayList<>();
        // Add a key value store for indexes
        StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("ping-data"),
                Serdes.String(),
                Serdes.String());
        storeConfigurations.add(new StoreConfiguration(storeBuilder));
        configuration.setStoreConfigurations(storeConfigurations);
    }
}

The io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer implementation provided sets the required State-Store configurations into the method’s io.quarkiverse.kafkastreamsprocessor.api.configuration.Configuration object. In this example a ping-data key/value store has been created.

Punctuation

Kafka Streams allows you to define Punctuator that are sort of scheduled tasks that Kafka Streams triggers (Kafka Streams documentation). One key issue with Punctuators is that they do not support Exceptions:

  • a checked Exception cannot be thrown as the method signature does not allow it

  • a RuntimeException because Kafka Streams does not catch it. It basically crashes your whole microservice.

To work around the latter point and increase stability, the quarkus-kafka-streams-processor extension wraps a Punctuator before it is added in Kafka Streams to catch the RuntimeException and log an error instead. Exceptions are also counted with a dedicated metric.

Custom decorators

The extension proposes some capabilities to customize more finely the behaviour of the different layers of decoration in addition to the ones that this extension brings to the table.

Processor decorator

The following decoration layer is already extensively used in this extension’s source code and allows to use composition around the main processor class you have to define. The pattern to implement is:

@Dependent (1)
@Priority(150) (2)
public class ProcessorDecorator extends AbstractProcessorDecorator { (3)
    @Override
    public void process(Record record) { (4)
      // use bean before
      getDelegate().process(record);
      // use bean after
    }
}
1 We have to mark the bean Dependent so it is instantiated at every use. Indeed, KStreamProcessorSupplier needs to return a new Processor instance everytime it is called, by Kafka Streams' specification.
2 We add a Priority, to control exactly the order of decorators. The priority is to be set based on the priorities of the existing decorators which are:
ProcessorDecoratorPriorities.java
     /**
     * Priority of the decorator in charge of tracing, creating a span around the
     * {@link ContextualProcessor#process(Record)} method.
     */
    public static final int TRACING = 100;
    /**
     * Priority of the decorator in charge or initializing a "request context" for the duration of the processing of the
     * {@link ContextualProcessor#process(Record)} method. It is closed afterward.
     */
    public static final int CDI_REQUEST_SCOPE = 200;
    /**
     * Priority of the decorator that will handle exception and potentially redirect the message in a dead letter queue
     * topic, if configured.
     */
    public static final int DLQ = 300;
    /**
     * Priority of the decorator in charge of measuring the processing time and the number of exceptions thrown.
     */
    public static final int METRICS = 400;
    /**
     * Priority of the decorator in charge of injecting all {@link DecoratedPunctuator} configured by the framework and
     * your custom potential additions.
     */
    public static final int PUNCTUATOR_DECORATION = 500;
    /**
     * Priority of the decorator in charge of implementing a form of fault tolerance by means of calling again the
     * {@link ContextualProcessor#process(Record)} method.
     */
    public static final int RETRY = 600;
3 We remove the generic types from the class signature, because CDI does not like generics in beans.
4 Example of override of process method and call to underlying decorator.

Such a decorator will automatically been taken into account by CDI. The priority will control at which point your decorator will be called among all other decorators.

Producer interceptor

Kafka Streams already has the concept of a ProducerInterceptor. But as the rest of Kafka Streams SPI, it is based on a class name and a default constructor for instantiation. It does not support CDI resolution.

This is why this extension’s API defines a ProducerOnSendInterceptor interface that is instrumented through CDI. Example of usage:

MyProducerInterceptor.java
@ApplicationScoped (1)
public class HeaderAddingProducerInterceptor implements ProducerOnSendInterceptor { (2)
    @Override
    public int priority() { (3)
        return 200;
    }

    @Override
    public ProducerRecord<byte[], byte[]> onSend(ProducerRecord<byte[], byte[]> producerRecord) { (4)
        producerRecord.headers().remove("header");
        producerRecord.headers().add("header", "headerValue".getBytes(StandardCharsets.UTF_8));
        return producerRecord;
    }
}
1 Producer interceptors are discovered by CDI by the ApplicationScoped annotation
2 The interceptor class should extend ProducerOnSendInterceptor. ProducerOnSendInterceptor extends ProducerInterceptor<byte[], byte[]> and overrides some of its methods with default implementations to exempt their forced implementations further down the line.
3 This is not mandatory. But it allows to control in which order multiple interceptor that would defined are called. The default priority is 100.
4 The intercepted method call which allow to modify the ProducerRecord before it is finally sent to Kafka. In this example, we replace a header named header with another value headervalue.

Punctuator decorator

A Kafka Streams Punctuator is a callback to use with ProcessorContext#schedule(…​). It allows to schedule a periodic operation, depending on batch of incoming messages or on a timely manner.

We propose in the extension a way to decorate any Punctuator a microservice would create.

The extension does not give the capability to narrow the decoration on only a given instance or class of Punctuator.

The decoration happens through the implementation of a DecoratedPunctuator interface with a bit the same idea as for the Processor decorator: usage of the CDI Decorator feature. Example of decoration:

MyDecoratedPunctuator.java
@Decorator (1)
@Priority(150) (2)
public class CdiRequestContextPunctuatorDecorator implements DecoratedPunctuator { (3)
    @lombok.experimental.Delegate(excludes = Excludes.class)
    private final DecoratedPunctuator delegate; (4)

    private final MyBean myBean;

    @Inject
    public CdiRequestContextPunctuatorDecorator(@Delegate DecoratedPunctuator delegate, MyBean myBean) { (5)
        this.delegate = delegate;
        this.myBean = myBean;
    }

    @Override
    public void punctuate(long timestamp) { (6)
        // use before punctuate the myBean reference
        delegate.punctuate(timestamp);
        // use after punctuate the myBean reference
    }

    private interface Excludes {
        void punctuate(long timestamp);
    }
}
1 Decorator annotation to profit from the decorator feature of CDI
2 Force the instantiation of the decorator with the Priority annotation. Indeed, otherwise the decorator is not taken into account by Quarkus. The priority is to be set based on the priorities of the existing decorators which are:
PunctuatorDecoratorPriorities.java
     /**
     * Priority of the {@link DecoratedPunctuator} that enabled a "request context" for the duration of the
     * {@link Punctuator#punctuate(long)} processing.
     */
    public static final int CDI_REQUEST_SCOPE = 100;
    /**
     * Priority of the {@link DecoratedPunctuator} that catches punctuation exception to avoid making the entire
     * microservice crash and counts those exceptions in a metric.
     */
    public static final int METRICS = 200;
3 The decorator should extend the DecoratedPunctuator interface defined by this extension. It won’t be considered otherwise.
4 Delegate reference to use when decorating methods. It is annotated with Delegate annotation from Lombok to generate passthrough decorated methods that this Decorator class won’t decorate. The selection is done through a blacklist of method signatures gathered in a private Excludes interface declared at the end of the class.
5 Injection constructor which must have a delegate argument annotated with the Delegate annotation from CDI. Is showcased here also the injection of another bean to be used in this decorator.
6 Example of decorated method, here the main punctuate method of the Punctuator interface.

Extension configuration reference

Configuration property fixed at build time - All other configuration properties are overridable at runtime

Configuration property

Type

Default

The Kafka streams processor property to define a unique topic for incoming messages

Environment variable: KAFKASTREAMSPROCESSOR_INPUT_TOPIC

string

The Kafka streams processor property to define multiple topics for incoming messages (comma separated list).

Environment variable: KAFKASTREAMSPROCESSOR_INPUT_TOPICS

string

The Kafka topic for incoming messages for the given source name.

Environment variable: KAFKASTREAMSPROCESSOR_INPUT_SOURCES__source uppercase__TOPIC

string

The Kafka topics for outgoing messages.

Environment variable: KAFKASTREAMSPROCESSOR_OUTPUT_TOPIC

string

The Kafka topic for outgoing messages for the given sink name.

Environment variable: KAFKASTREAMSPROCESSOR_OUTPUT_SINKS__sink uppercase__TOPIC

string

Dead letter Queue name

Environment variable: KAFKASTREAMSPROCESSOR_DLQ_TOPIC

String

Global Dead letter Queue to produce error messages note manage by the application

Environment variable: KAFKASTREAMSPROCESSOR_GLOBAL_DLQ_TOPIC

String

Global Dead letter Queue maximum request size

Environment variable: KAFKASTREAMSPROCESSOR_GLOBAL_DLQ_PRODUCER_MAX_MESSAGE_SIZE

int

java.lang.Integer.MAX_VALUE

Enable the production of the error message in the global DLQ when the application can not manage the error.

Environment variable: RESILIENCY_KAFKA_GLOBALDLQ_ENABLED

boolean

false

Kafka Streams Processor error strategy

Environment variable: KAFKASTREAMSPROCESSOR_ERROR_STRATEGY

String

continue

Max number of retries. See microprofile doc.

Environment variable: KAFKASTREAMSPROCESSOR_RETRY_MAX_RETRIES

String

-1

The delay between retries. See microprofile doc.

Environment variable: KAFKASTREAMSPROCESSOR_RETRY_DELAY

long

0

The unit for delay. Default milliseconds. See microprofile doc.

Environment variable: KAFKASTREAMSPROCESSOR_RETRY_DELAY_UNIT

ChronoUnit

MILLIS

The max duration. See microprofile doc.

Environment variable: KAFKASTREAMSPROCESSOR_RETRY_MAX_DURATION

long

180000

The unit for max duration. See microprofile doc.

Environment variable: KAFKASTREAMSPROCESSOR_RETRY_DURATION_UNIT

ChronoUnit

MILLIS

Jitter value to randomly vary retry delays for. See microprofile doc.

Environment variable: KAFKASTREAMSPROCESSOR_RETRY_JITTER

long

200

The delay unit for jitter. Default is milliseconds. See microprofile doc.

Environment variable: KAFKASTREAMSPROCESSOR_RETRY_JITTER_DELAY_UNIT

ChronoUnit

MILLIS

The list of exception types that should trigger a retry. Default is the provided io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException. See microprofile doc.

Environment variable: KAFKASTREAMSPROCESSOR_RETRY_RETRY_ON

Exception class names

[io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException]

The list of exception types that should <i>not</i> trigger a retry. Default is empty list See microprofile doc.

Environment variable: KAFKASTREAMSPROCESSOR_RETRY_ABORT_ON

Exception class names

[]

Configuration from other extension

Configuration property fixed at build time - All other configuration properties are overridable at runtime

Configuration property

Type

Default

A comma-separated list of topic names.

The micro-service will only be ready once all these topics are present in the Kafka cluster.

Environment variable: QUARKUS_KAFKA_STREAMS_TOPIC

list of string

A unique identifier for this Kafka Streams application.

It is used as Kafka consumer group id, so multiple replicas of micro-service balance their partition workload.

Environment variable: QUARKUS_KAFKA_STREAMS_APPLICATION_ID

string

${quarkus.application.name}

It is the number of milliseconds a producer is willing to wait before sending a batch out.

Environment variable: KAFKA_STREAMS_PRODUCER_LINGER_MS

int

0

The compression type for all data generated by the producer.

Environment variable: KAFKA_STREAMS_COMPRESSION_TYPE

Possible values:

none gzip snappy lz4 zstd

none