Kafka Streams Processor
This extension eases the implementation of event-driven architecture streaming microservices based on Kafka Streams’ Processor API. It avoids Kafka Streams' users a lot of boilerplate code to have a dedicated topology and a proper production readiness (health, observability, error handling). All that is left to develop is a simple Processor class. The extension takes care of the rest.
Streaming processor
A streaming microservice has some requirements:
-
Kafka Streams processors are not thread-safe, so a processor scope is
@jakarta.enterprise.context.Dependent
by default. -
An incoming message, from an input topic, can be acknowledged only when it has been processed and its result message has been produced and stored in a Kafka output topic.
-
If an incoming message could not be acknowledged (e.g: a microservice crash) then the incoming message will be redelivered (at-least-once guarantee).
-
Incoming messages are processed in the order of arrival (ordering guarantee)
There are currently several bootstraps or libraries that support writing stream processor-based applications on different broker technologies. This extension’s processor solution is based on Kafka Streams' Processor API.
Kafka Streams
Kafka Streams is an opensource client library that allows to write streaming applications.
It is enabled by the quarkus-kafka-streams extension:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-streams</artifactId>
<scope>runtime</scope>
</dependency>
Only one threading model is supported: 1 thread per topic-partition (see details)
The number of partitions on the source topic determines up to how many threads can consume in parallel.
Synchronous/blocking programing must be used to ensure guarantee of execution and ordering.
There is a proposal under discussion for adding asynchronous processing to Kafka Streams. |
A simplified Processor API
On top of Kafka Streams, the quarkus-kafka-streams-processor extension adds features to help you to write streaming microservices.
The following dependency needs to be added:
<project>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-bom</artifactId>
<version>${quarkus.kafkastreamsprocessor.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-api</artifactId>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
</project>
With the extension, it is expected to define an annotated Kafka Streams' Processor. It is discovered as a CDI bean and instrumented by the Quarkus runtime. The difference here with the fully fledged Kafka Streams Processor API is that there is no need to build a custom Topology for each new microservice. It is built by the Quarkus runtime.
Example:
@Slf4j
@Processor (1)
public class PingProcessor extends ContextualProcessor<String, Ping, String, Ping> { (2)
@Override
public void process(Record<String, Ping> record) { (3)
resultMessage = countChars(record.value().getMessage());
Ping pong = Ping.newBuilder().setMessage(resultMessage).build();
context().forward(record.withValue(pong)); (4)
}
/**
* Counts the nb of characters and returns it as string.
*/
private String countChars(String input) {
return String.valueOf(input.length());
}
}
1 | Processors are beans managed by CDI, and as such need to be annotated with
@io.quarkiverse.kafkastreamsprocessor.api.Processor so that any other CDI dependency may be injected.
Kafka Streams' processors are not thread-safe, so a processor scope is @jakarta.enterprise.context.Dependent by default. |
2 | Directly extend org.apache.kafka.streams.processor.api.ContextualProcessor , specifying the type of keys and values.
For keys, only type String is supported.
For values, see supported types. |
3 | The process method is called for each incoming message.
Any processing must be done synchronously. |
4 | The forward method allows to produce an outgoing message.
Internally Kafka Streams links the acknowledgment on the outgoing message with the commit of the associated incoming message. |
Topology builder
In short (see core concepts), in Kafka Streams a Topology is made of:
-
Sources: 1 or more input topics
-
Processors: a graph of 1 or more record handlers
-
Stores (optional): for stateful use cases, the processor must be assigned one or multiple stores to keep the state.
-
Sinks: 1 or more output topics
The extension will automatically build the following topology:
-
Scan for a CDI bean of type
org.apache.kafka.streams.processor.api.Processor<KIn, VIn, KOut, VOut>
(single processor). The deprecated typeorg.apache.kafka.streams.processor.Processor<K, V>
is also supported for backward-compatibility. -
Register a source for a single topic in
kafkastreamsprocessor.input.topic
, String key serializer and Protobuf value deserializer based on the declared type<V>
of the processor. If multiple topics are declared the configuration should bekafkastreamsprocessor.input.topics
. Also, to manipulate topics coming from different sources the property should include the source name in the declaration askafkastreamsprocessor.input.sources.<sources>.topic
. -
Register as many sinks as topics listed in properties matching
kafkastreamsprocessor.output.sinks.<sink>.topic
, String key serializer and Protobuf value serializer. Sink name is taken from the properties name. If nokafkastreamsprocessor.output.sinks.<sink>.topic
properties are defined and a single topic is defined inkafkastreamsprocessor.output.topic
, a default sink is used.
Configuration
Please consider two kind of properties:
First class citizen properties
First class citizen properties are a set of properties prefixed with quarkus.kafka-streams
.
They are consumed by the Quarkus kafka-streams extension itself (used at build time and overridable, for some of them, at runtime).
In Kafka Streams Processor the following configuration keys are required:
Refer to Kafka Streams extension’s configuration reference for a full list.
All the properties used in the extension are recapped
here
Pass-through properties
Due to the important number of properties that can be set to configure a Kafka Streams application, it’s impossible for Quarkus extension to know them all.
But it’s possible to use pass-through properties in the configuration of your application, using the name of property but prefixed with kafka-streams
.
# First citizen property
quarkus.kafka-streams.bootstrap-servers=localhost:9092
# Pass-through properties
kafka-streams.producer.linger.ms=0
kafka-streams.compression.type=gzip
Override pass-through properties
Overriding pass-through properties at runtime is a little bit hacky because you need to prefix the property with quarkus.
Then, you can pass it as a Java system property.
Passing the property as an environment variable with Microprofile Config convention IS NOT working. |
Fo example, the possible way to override a pass-through property in a Kafka Streams Processor microservice would be to use the corresponding system property in the JVM commandline:
-Dquarkus.kafka-streams.producer.linger.ms=50
Data serialization
Data serialization refers to the process of converting complex data structures or objects into a format that can be easily transmitted over a network. The extension’s processors support different serialization formats. By analyzing the types of declared input and output payloads of the processor, it’s possible to infer the type of data to serialize and set up the correct serializer and deserializer.
As of today, the extension’s processors support three first-class citizen formats and one customization feature:
-
Other data format can be supported through the Serializer/Deserializer customization
JSON is the default serialization format. If the payload cannot be serialized to JSON, an error is thrown. Depending on the Global DLQ Error management the message will be produced in the DLQ, and the incoming message will be acknowledged in the input topic. |
JSON
JSON is a first class citizen serialization format, the SDK will use JSON as default serialization format.
Example
In this example, we implement a processor working with a POJO as value type. The POJO is read from kafka, or written to kafka, with the POJO’s JSON textual representation.
@Slf4j
@Processor (1)
public class PojoProcessor extends ContextualProcessor<String, SamplePojo, String, SamplePojo> { (2)
@Override
public void process(Record<String, SamplePojo> record) { (3)
String reversedMsg = new StringBuilder(record.value().getStringField()).reverse().toString();
log.info("Received value {} sending back {} in response", record.value().getStringField(), reversedMsg);
SamplePojo pojo = new SamplePojo(reversedMsg, record.value().getNumericalField() + 37,
!record.value().getBooleanField());
context().forward(record.withValue(pojo)); (4)
}
}
1 | Your Processor is declared with the annotation as for a regular processor. |
2 | The handled value type, is a class that can be serialized in JSON. |
3 | Same POJO value in the process() method. |
4 | Similarly, a POJO value can be sent down to the output topic. |
Configuring JSON serialization
In order to customize the JSON serialization, the SDK relies on the ObjectMapper
customization from Quarkus
Serialization to Protobuf and its caveats
Protobuf values and String are the first class citizen data types in Kafka Streams Processor messaging.
To ease development with Protobuf, various libraries support code generation from .proto
files or
OpenAPI descriptors, for instance openapi-generator.
The main problem with this approach is that certain details of the serialization logic are implementation-dependent and may have impacts on the usability of the generated APIs.
Null values
One such case in the handling of optional elements, i.e. how to detect in business code if an element was absent, or present but with an empty value.
There are multiple solutions to this problem, e.g.:
-
Use built-in primitive types
-
Use custom Nullable types instead of primitives
-
Use built-in wrapper types instead of primitives
-
Use optional in proto3
Serializer & Deserializer Customization
API
The SDK provides the interface io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer
which allows you to specify the serializer and deserializer you need.
You need to declare a bean implementing that interface.
It is available through the following dependency:
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-api</artifactId>
</dependency>
Multiple customizers can be defined, and their execution order controlled through @Priority annotations.
|
Example
In this example, we implement a processor working with a POJO as value type. The POJO is read from kafka, or written to kafka, with a custom serialization format provided by sink and source serializer/deserializer that need to be known at build time.
@Slf4j
@Processor (1)
public class PojoProcessor extends ContextualProcessor<String, SamplePojo, String, SamplePojo> { (2)
@Override
public void process(Record<String, SamplePojo> record) { (3)
String reversedMsg = new StringBuilder(record.value().getStringField()).reverse().toString();
log.info("Received value {} sending back {} in response", record.value().getStringField(), reversedMsg);
SamplePojo pojo = new SamplePojo(reversedMsg, record.value().getNumericalField() + 37,
!record.value().getBooleanField());
context().forward(record.withValue(pojo)); (4)
}
}
1 | Your Processor is declared with the annotation as for a regular processor. |
2 | The handled value type, in this example, is a simple POJO, nothing fancy. |
3 | Same POJO value in the process() method. |
4 | Similarly, a POJO value can be sent down to the output topic. |
Ser/Des Customization
@Dependent
public class SampleConfigurationCustomizer implements ConfigurationCustomizer {
@Override
public void fillConfiguration(Configuration configuration) {
configuration.setSinkValueSerializer(new MyCustomSerializer<SamplePojo>());
configuration.setSourceValueSerde(new MyCustomSerde(SamplePojo));
}
}
The io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer
implementation provided sets the required Serializer and Deserializer into the method’s io.quarkiverse.kafkastreamsprocessor.api.configuration.Configuration
object.
In this example, we set a custom serialization through the usage of Kafka Serdes.
Error management
Strategy for error management
When an exception occurs while processing a record, different strategies have been defined:
-
"continue" (default) : drop the message and continue processing
-
"fail" (not implemented yet) : fail and stop processing more message
-
"dead-letter-queue" : send the message to the DLQ and continue processing
DLQ monitoring
Such exception occurs are counted by a custom metric of this extension.
If the dead-letter-queue
strategy is chosen, the messages produced to the DLQ topics are counted.
Dead-letter-queue strategy
In the case of a dead-letter-queue
strategy, the DLQ topic is added to the readiness probe to ensure the ability to produce the poisonous message to this topic.
This local DLQ is meant only to store the poisonous message that the application could not consume successfully. For error happening at production time, one can use a global DLQ defined for this purpose. |
Global Dead Letter Queue
When a microservice cannot produce to a downstream Kafka broker and that the error cannot be managed by the application
, the message will be produced in a global DLQ
.
The following value must be set in the application.properties file
kafkastreamsprocessor.global-dlq.topic=<global-dlq-topic>
By default, the maximum size for the message is java.lang.Integer.MAX_VALUE
for the global DLQ.
It can be modified with:
kafkastreamsprocessor.global-dlq.max-message-size=3000
All the properties used in the extension are recapped
here
Error metadata
For tracking purpose following headers are added to the record:
-
dead-letter-reason
: The reason of the failure if any (Exception message) -
dead-letter-cause
: The cause of the failure if any (Exception cause) -
dead-letter-topic
: The original topic of the record, in this case the output topic where production failed -
dead-letter-partition
: The original partition of the record
Retries
In order to improve resiliency and to avoid losing messages due to the temporary unavailability of an external system (database, HTTP endpoint, …), the Kafka Streams Processor SDK has a retry logic for the exceptions raised during the process of the message.
The retry relies on the at-least-once guarantees of messaging: the processing must already support being called multiple times with the same message, and must make sure any external interaction is idempotent. |
A RetryDecorator
has been introduced inside the topology definition.
Its responsibility is to retry indefinitely the @io.quarkiverse.kafkastreamsprocessor.api.Processor
retryableProcess
method when a io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException
is raised.
Configuration
The default behavior can be overriden via the following configuration, ie:
kafkastreamsprocessor.retry.max-retries=-1
kafkastreamsprocessor.retry.retry-on=io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException
As this implementation is based on Eclipse Microprofile Fault Tolerance it’s also possible to override some default policy (ie: maxDuration, jitter…) under the prefix kafkastreamsprocessor.retry.
.
Ad hoc usage
You can also use the Fault Tolerance @org.eclipse.microprofile.faulttolerance.Retry
annotation for some other logic in your code.
public class PingerService {
@Retry (1)
@Override
public Response ping(String message) {
}
}
1 | Define the method to retry with org.eclipse.microprofile.faulttolerance.Retry annotation |
# Override max retry default value io.quarkiverse.kafkastreamsprocessor.example.PingerService/Retry/maxRetries=3
Monitoring
Kafka metrics cardinality
Users have to pay attention to some specificities of Kafka Streams metrics: some metrics have labels whose values are dynamic. There’s a possible impact on Prometheus because this could drastically increase the amount of data needed to store the time series. To understand how IDs label are computed, one can read the following documentation on KafkaStreams runtime information
To mitigate the impact, it is a good practice to fix arbitrary prefix to dynamic labels but still, the dynamicity cannot be removed entirely.
As an example, all Kafka metrics have in common the label client_id
, the resulting computation of the label by the SDK is:
kafka_consumer_fetch_manager_records_consumed_total{client_id="application-name-StreamThread-1-consumer",kafka_version="3.6.3",topic="ping-events",} 1.0
The metric client_id
value is formed of :
-
quarkus-kafka-streams-processor-simple-sample
which is the client id defined inkafka-streams.client.id
config property -
StreamThread-1-consumer
which is computed by KafkaStreams and depends on the number of threads.
So it is a good practice to set kafka-streams.client.id
with the name of the microservice.
quarkus-kafka-streams-processor metric list
This list includes the additional metrics, on top of the Kafka Streams and the general Kafka ones, that this extension instruments:
Micrometer metric name | Type | Description |
---|---|---|
kafkastreamsprocessor.processor.errors |
Counter |
Total number of errors encountered during Kafka Streams message processing |
Micrometer metric name | Type | Description |
---|---|---|
kafkastreamsprocessor.punctuation.errors |
Counter |
The number of times a Punctuator’s execution failed with an exception since the start of the microservice. |
Micrometer metric name | Type | Description |
---|---|---|
kafkastreamsprocessor.dlq.sent |
Counter |
The number of messages sent to DLQ. This metrics differs from the kafkastreamsprocessor.processor.errors in that the deserialization errors are taken into account. If no DLQ strategy is chosen, then this metric will be 0. Messages too large for the DLQ will be counted both in this metrics and in kafkastreamsprocessor.global.dlq.sent. |
kafkastreamsprocessor.global.dlq.sent |
Counter |
The number of messages sent to global DLQ. |
A comparison between Reactive Messaging Kafka and Kafka Streams
These two technologies can be used to create streaming microservices to be used in Event-Driven architecture applications.
As explained in the Streaming processor introduction, a streaming microservice has some requirements:
-
An incoming message, from an input topic, can be acknowledged only when it has been processed and its result message has been produced and stored in a Kafka output topic.
-
If an incoming message could not be acknowledged (e.g: a microservice crash) then the incoming message will be redelivered (at-least-once guarantee).
-
Incoming messages are processed in the order of arrival (ordering guarantee)
In the Kafka Streams bootstrap, the guarantee of ordering from the source topic has a direct impact on the message consumption and the scalability model. As explained in the Kafka Streams, the number of partitions of the source topic determines how many threads can consume in parallel.
In other terms, for an input topic, it’s not possible to process more that x
messages in parallel where x
is the number of partitions of the source topic.
Starting asynchronous processing in a Kafka Streams microservice would lead to loss of messages: acknowledgement of the incoming messages will be de-synchronized from the production of messages once outgoing messages have been processed. |
Reactive messaging has a different architecture and threading model compared to Kafka Streams. But by default, when processing messages it comes with the same guarantees seen above.
However, it’s possible to alter the behaviour of the message processing with annotations to improve, in a drastic manner, the concurrency of a microservice. The purpose of increasing concurrency is to be able to cope with streaming microservice that need to call slow remote APIs, this is the bridging from event-driven to service-oriented architecture.
Concurrent processing in Reactive Messaging
@Incoming("ping-events") (1)
@Outgoing("pong-events") (2)
@Blocking(ordered = false) (3)
public Ping process(Ping ping) {
return api.remoteCall();
}
1 | @Incoming is declaring this method as a subscriber for the channel named ping-events |
2 | @Outgoing is declaring this method as a producer for the channel named pong-events |
3 | @io.smallrye.reactive.messaging.annotations.Blocking Indicates that this method is running out of the processing thread, inside a worker thread and the order of the messages is not important.
Note that once the execution is finished the result is handled back to the processing thread. |
Guarantee of delivery in concurrent processing
In Kafka Streams, performing asynchronous calls would lead to the possibility of messages not being redelivered. What about the guarantee of delivery when processing concurrent messages in Reactive Messaging?
It’s up to the acknowledgement policy to commit incoming messages when it has been processed concurrently. The default strategy is throttled and guarantees at-least-once delivery
throttled keeps track of received messages and commits to the next offset after the latest acked message in sequence.
This strategy guarantees at-least-once delivery even if the channel performs asynchronous processing.
|
Effects of throttled.unprocessed-record-max-age.ms on the liveness probe
When using the throttled
strategy, the commit of the sequence of messages will happen only when all messages have been processed (ordered or not).
If the processing method does not return before reaching the throttled.unprocessed-record-max-age.ms
timeout, the microservice will be considered as unhealthy
and the liveness probe will fail and the microservice will be restarted.
It means that, for example, if a remote API takes too long to respond for some queries, and its timeout is configured with a value above throttled.unprocessed-record-max-age.ms
, then messages will be received again after the restart, acting like a poison pill.
Processors summary
Here’s a recap of the guarantees offered by the different way of processing messages.
Processor | Guarantee of order | At-least-once delivery | Max concurrency |
---|---|---|---|
Default Kafka Streams |
Yes |
Yes |
number of partitions |
Kafka Streams asynchronous |
No |
No |
number of threads |
Default Reactive Messaging |
Yes |
Yes |
number of partitions |
Reactive Messaging with @Blocking |
Yes |
Yes |
number of partitions |
Reactive Messaging with @Blocking(order=false) |
No |
Yes |
number of threads |
Stateful EDA
Introduction
To support stateful requirements, Kafka Streams' Processor need to implement a State-Store.
By default, no State-Store are linked to the io.quarkiverse.kafkastreamsprocessor.api.Processor
but the application can override this configuration.
API
The SDK provides the interface io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer
which allows you to specify the State-Store you need.
You need to declare a bean implementing that interface.
It is available through the following dependency:
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-api</artifactId>
</dependency>
Multiple customizers can be defined, and their execution order controlled through @Priority annotations.
|
Example
In this example, we implement a processor which is using a State-Store.
Processor
@Slf4j
@Processor (1)
public class PingProcessor extends ContextualProcessor<String, Ping, String, Ping> {
private KeyValueStore<String, String> pingData;
@Override
public void init(ProcessorContext<String, Ping> context) {
super.init(context);
pingData = context.getStateStore("ping-data"); (2)
context.schedule(Duration.ofMillis(1L), PunctuationType.STREAM_TIME, new DuplicateValuePunctuator(pingData));
}
/**
* {@inheritDoc}
*/
@Override
public void process(Record<String, Ping> record) {
log.info("Process the message: {}", record.value().getMessage());
String previousValue = pingData.get(record.key());
pingData.put(record.key(), record.value().getMessage());
if (previousValue == null) {
context().forward(
record.withValue(Ping.newBuilder().setMessage("Store initialization OK for " + record.key()).build()));
} else {
context().forward(record.withValue(
Ping.newBuilder().setMessage("Previous value for " + record.key() + " is " + previousValue).build()));
}
}
}
1 | Your Processor is declared with the annotation as for a regular processor. |
2 | The definition and initialization of your state store. |
StateStore Customization
@Dependent
public class SampleConfigurationCustomizer implements ConfigurationCustomizer {
@Override
public void fillConfiguration(Configuration configuration) {
List<StoreConfiguration> storeConfigurations = new ArrayList<>();
// Add a key value store for indexes
StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("ping-data"),
Serdes.String(),
Serdes.String());
storeConfigurations.add(new StoreConfiguration(storeBuilder));
configuration.setStoreConfigurations(storeConfigurations);
}
}
The io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer
implementation provided sets the required State-Store configurations into the method’s
io.quarkiverse.kafkastreamsprocessor.api.configuration.Configuration
object.
In this example a ping-data
key/value store has been created.
Punctuation
Kafka Streams allows you to define Punctuator that are sort of scheduled tasks that Kafka Streams triggers (Kafka Streams documentation). One key issue with Punctuators is that they do not support Exceptions:
-
a checked Exception cannot be thrown as the method signature does not allow it
-
a RuntimeException because Kafka Streams does not catch it. It basically crashes your whole microservice.
To work around the latter point and increase stability, the quarkus-kafka-streams-processor
extension wraps a Punctuator
before it is added in Kafka Streams to catch the RuntimeException and log an error instead.
Exceptions are also counted with a dedicated metric.
Custom decorators
The extension proposes some capabilities to customize more finely the behaviour of the different layers of decoration in addition to the ones that this extension brings to the table.
Processor decorator
The following decoration layer is already extensively used in this extension’s source code and allows to use composition around the main processor class you have to define. Example of a new decorator:
@Decorator (1)
@Priority(150) (2)
public class ProcessorDecorator<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> { (3)
@lombok.experimental.Delegate(excludes = Excludes.class)
private final Processor<KIn, VIn, KOut, VOut> delegate; (4)
private final MyBean bean;
@Inject
public TracingDecorator(@Delegate Processor<KIn, VIn, KOut, VOut> delegate, MyBean bean) { (5)
this.delegate = delegate;
this.bean = bean;
}
@Override
public void process(Record<KIn, VIn> record) { (6)
// use bean before
delegate.process(record);
// use bean after
}
private interface Excludes {
<KIn, VIn> void process(Record<KIn, VIn> record);
}
}
1 | Decorator annotation to profit from the decorator feature of CDI |
2 | Force the instantiation of the decorator with the Priority annotation.
Indeed, otherwise the decorator is not taken into account by Quarkus.
The priority is to be set based on the priorities of the existing decorators which are:
ProcessorDecoratorPriorities.java
|
3 | The decorator should have the same generics declaration <KIn, VIn, KOut, VOut> as the Processor<KIn, VIn, KOut, VOut> interface that it implements |
4 | Delegate reference to use when decorating methods.
It is annotated with lombok’s Delegate annotation to generate
passthrough decorated methods that this Decorator class won’t decorate.
The selection is done through a blacklist of method signatures gathered in a private Excludes interface declared at the end of the class. |
5 | Injection constructor which must have a delegate argument annotated with the Delegate annotation from CDI.
You can also, as a regular CDI bean, inject any another CDI bean reference to be used in this decorator. |
6 | Example of decorated method, here the main process method of Processor API of Kafka Streams. |
Such a decorator will automatically been taken into account by CDI through the combination of Decorator
and Priority
annotations.
The priority will control at which point your decorator will be called among all other decorators.
Producer interceptor
Kafka Streams already has the notion of a ProducerInterceptor. But as the rest of Kafka Streams SPI, it is based on a class name and a default constructor for instantiation. It does not support CDI resolution.
This is why this extension’s API defines a ProducerOnSendInterceptor
interface that is instrumentated through CDI.
Example of usage:
@ApplicationScoped (1)
public class HeaderAddingProducerInterceptor implements ProducerOnSendInterceptor { (2)
@Override
public int priority() { (3)
return 200;
}
@Override
public ProducerRecord<byte[], byte[]> onSend(ProducerRecord<byte[], byte[]> producerRecord) { (4)
producerRecord.headers().remove("header");
producerRecord.headers().add("header", "headervalue".getBytes(StandardCharsets.UTF_8));
return producerRecord;
}
}
1 | Producer interceptors are discovered by CDI by the ApplicationScoped annotation |
2 | The interceptor class should extend ProducerOnSendInterceptor .
ProducerOnSendInterceptor extends ProducerInterceptor<byte[], byte[]> and overrides some of its methods with default implementations to exempt their forced implementations further down the line. |
3 | This is not mandatory.
But it allows to control in which order multiple interceptor that would defined are called.
The default priority is 100 . |
4 | The intercepted method call which allow to modify the ProducerRecord before it is finally sent to Kafka.
In this example, we replace a header named header with another value headervalue . |
Punctuator decorator
A Kafka Streams Punctuator is a callback to use with ProcessorContext#schedule(…). It allows to schedule a periodic operation, depending on batch of incoming messages or on a timely manner.
We propose in the extension a way to decorate any Punctuator
a microservice would create.
The extension does not give the capability to narrow the decoration on only a given instance or class of Punctuator .
|
The decoration happens through the implementation of a DecoratedPunctuator
interface with a bit the same idea as for the Processor decorator: usage of the CDI Decorator feature.
Example of decoration:
@Decorator (1)
@Priority(150) (2)
public class CdiRequestContextPunctuatorDecorator implements DecoratedPunctuator { (3)
@lombok.experimental.Delegate(excludes = Excludes.class)
private final DecoratedPunctuator delegate; (4)
private final MyBean myBean;
@Inject
public CdiRequestContextPunctuatorDecorator(@Delegate DecoratedPunctuator delegate, MyBean myBean) { (5)
this.delegate = delegate;
this.myBean = myBean;
}
@Override
public void punctuate(long timestamp) { (6)
// use before punctuate the myBean reference
delegate.punctuate(timestamp);
// use after punctuate the myBean reference
}
private interface Excludes {
void punctuate(long timestamp);
}
}
1 | Decorator annotation to profit from the decorator feature of CDI |
2 | Force the instantiation of the decorator with the Priority annotation.
Indeed, otherwise the decorator is not taken into account by Quarkus.
The priority is to be set based on the priorities of the existing decorators which are:
PunctuatorDecoratorPriorities.java
|
3 | The decorator should extend the DecoratedPunctuator interface defined by this extension.
It won’t be considered otherwise. |
4 | Delegate reference to use when decorating methods.
It is annotated with lombok’s Delegate
annotation to generate passthrough decorated methods that this Decorator class won’t decorate.
The selection is done through a blacklist of method signatures gathered in a private Excludes interface declared at the end of the class. |
5 | Injection constructor which must have a delegate argument annotated with the Delegate annotation from CDI.
Is showcased here also the injection of another bean to be used in this decorator. |
6 | Example of decorated method, here the main punctuate method of the Punctuator interface. |
Extension configuration reference
Configuration property fixed at build time - All other configuration properties are overridable at runtime
Type |
Default |
|
---|---|---|
The Kafka streams processor property to define a unique topic for incoming messages Environment variable: |
string |
|
The Kafka streams processor property to define multiple topics for incoming messages (comma separated list). Environment variable: |
string |
|
The Kafka topic for incoming messages for the given source name. Environment variable: |
string |
|
The Kafka topics for outgoing messages. Environment variable: |
string |
|
The Kafka topic for outgoing messages for the given sink name. Environment variable: |
string |
|
Dead letter Queue name Environment variable: |
String |
|
Global Dead letter Queue to produce error messages note manage by the application Environment variable: |
String |
|
Global Dead letter Queue maximum request size Environment variable: |
int |
java.lang.Integer.MAX_VALUE |
Enable the production of the error message in the global DLQ when the application can not manage the error. Environment variable: |
boolean |
false |
Kafka Streams Processor error strategy Environment variable: |
String |
continue |
Max number of retries. See microprofile doc. Environment variable: |
String |
-1 |
The delay between retries. See microprofile doc. Environment variable: |
long |
0 |
The unit for delay. Default milliseconds. See microprofile doc. Environment variable: |
ChronoUnit |
MILLIS |
The max duration. See microprofile doc. Environment variable: |
long |
180000 |
The unit for max duration. See microprofile doc. Environment variable: |
ChronoUnit |
MILLIS |
Jitter value to randomly vary retry delays for. See microprofile doc. Environment variable: |
long |
200 |
The delay unit for jitter. Default is milliseconds. See microprofile doc. Environment variable: |
ChronoUnit |
MILLIS |
The list of exception types that should trigger a retry. Default is the provided io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException. See microprofile doc. Environment variable: |
Exception class names |
|
The list of exception types that should <i>not</i> trigger a retry. Default is empty list See microprofile doc. Environment variable: |
Exception class names |
|
Configuration from other extension
Configuration property fixed at build time - All other configuration properties are overridable at runtime
Configuration property |
Type |
Default |
---|---|---|
A comma-separated list of topic names. The micro-service will only be ready once all these topics are present in the Kafka cluster. Environment variable: |
list of string |
|
A unique identifier for this Kafka Streams application. It is used as Kafka consumer group id, so multiple replicas of micro-service balance their partition workload. Environment variable: |
string |
${quarkus.application.name} |
It is the number of milliseconds a producer is willing to wait before sending a batch out. Environment variable: |
int |
0 |
The compression type for all data generated by the producer. Environment variable: |
Possible values: none gzip snappy lz4 zstd |
none |