Configuration property fixed at build time - All other configuration properties are overridable at runtime

Configuration property

Type

Default

Single topic to listen to.

If you need more than one, use topics() or sources().

Environment variable: KAFKASTREAMSPROCESSOR_INPUT_TOPIC

string

List of topics to listen to.

If you need only one, use topic()

Environment variable: KAFKASTREAMSPROCESSOR_INPUT_TOPICS

list of string

To which topics will KafkaStreams connect to for this source.

Environment variable: KAFKASTREAMSPROCESSOR_INPUT_SOURCES__SOURCES__TOPICS

list of string

required

Whether cloud events are used on the input

Environment variable: KAFKASTREAMSPROCESSOR_INPUT_IS_CLOUD_EVENT

boolean

false

Allows to inject custom configuration for the CloudEventDeserializer.

As of now, only one configuration property is supported: cloudevents.datamapper and its value must be a reference to a CloudEventDataMapper, which with the String value type here, cannot be passed.

But the Cloud Events Java SDK could add other configuration properties later on, and this map makes quarkus-kafka-streams-processor more future-proof.

Environment variable: KAFKASTREAMSPROCESSOR_INPUT_CLOUD_EVENT_DESERIALIZER_CONFIG__CLOUD_EVENT_DESERIALIZER_CONFIG_

Map<String,String>

The processor is mono-output, we designate one topic

Environment variable: KAFKASTREAMSPROCESSOR_OUTPUT_TOPIC

string

The topic associated to this sink

Environment variable: KAFKASTREAMSPROCESSOR_OUTPUT_SINKS__SINKS__TOPIC

string

required

Whether cloud events are used on the output

Environment variable: KAFKASTREAMSPROCESSOR_OUTPUT_IS_CLOUD_EVENT

boolean

false

Allows to inject custom configuration for the CloudEventSerializer.

The potential values are documented in the cloudevents Java SDK (linked thereafter) and in the quarkus-kafka-streams-processor userguide

Environment variable: KAFKASTREAMSPROCESSOR_OUTPUT_CLOUD_EVENT_SERIALIZER_CONFIG__CLOUD_EVENT_SERIALIZER_CONFIG_

Map<String,String>

Allows to define the type field of the CloudEvent for all the configured sinks.

It is used only if OutputConfig#isCloudEvent() is true.

Environment variable: KAFKASTREAMSPROCESSOR_OUTPUT_CLOUD_EVENTS_TYPE

string

Allows to define the source field of the CloudEvent for all the configured sinks.

It is used only if OutputConfig#isCloudEvent() is true.

Environment variable: KAFKASTREAMSPROCESSOR_OUTPUT_CLOUD_EVENTS_SOURCE

URI

Version of the CloudEvents spec to use

Environment variable: KAFKASTREAMSPROCESSOR_OUTPUT_CLOUD_EVENTS_SPEC_VERSION

v03, v1

v1

The name of the global state store topic

Environment variable: KAFKASTREAMSPROCESSOR_GLOBAL_STORES__GLOBAL_STORES__TOPIC

string

required

Topic to use as dead-letter-queue

Environment variable: KAFKASTREAMSPROCESSOR_DLQ_TOPIC

string

Allows to inject custom configuration for the CloudEventSerializer that will be used to serialize the input cloud event to the DLQ.

The possible keys that can be used are documented in CloudEventOutputConfig#cloudEventSerializerConfig().

Environment variable: KAFKASTREAMSPROCESSOR_DLQ_CLOUD_EVENT_SERIALIZER_CONFIG__CLOUD_EVENT_SERIALIZER_CONFIG_

Map<String,String>

Global Dead letter Queue to produce error messages not managed by the application

Environment variable: KAFKASTREAMSPROCESSOR_GLOBAL_DLQ_TOPIC

string

Global Dead letter Queue maximum message size in bytes.

Default is 2147483647 bytes (which means about 2GB).

Environment variable: KAFKASTREAMSPROCESSOR_GLOBAL_DLQ_MAX_MESSAGE_SIZE

int

2147483647

Kafka error handling strategy.

Possible values are:

  • continue: (default) drop the message and continue processing

  • dead-letter-queue: send the message to the DLQ and continue processing

  • fail: (not implemented yet) fail and stop processing more message

Environment variable: KAFKASTREAMSPROCESSOR_ERROR_STRATEGY

string

continue

Max number of retries.

Environment variable: KAFKASTREAMSPROCESSOR_RETRY_MAX_RETRIES

int

-1

The delay between retries.

Environment variable: KAFKASTREAMSPROCESSOR_RETRY_DELAY

long

0

The unit for delay().

Default is milliseconds.

Environment variable: KAFKASTREAMSPROCESSOR_RETRY_DELAY_UNIT

nanos, micros, millis, seconds, minutes, hours, half-days, days, weeks, months, years, decades, centuries, millennia, eras, forever

millis

The max duration.

Environment variable: KAFKASTREAMSPROCESSOR_RETRY_MAX_DURATION

long

180000

The duration unit for max-duration().

Milliseconds by default.

Environment variable: KAFKASTREAMSPROCESSOR_RETRY_DURATION_UNIT

nanos, micros, millis, seconds, minutes, hours, half-days, days, weeks, months, years, decades, centuries, millennia, eras, forever

millis

Jitter value to randomly vary retry delays for.

Environment variable: KAFKASTREAMSPROCESSOR_RETRY_JITTER

long

200

The delay unit for jitter().

Default is milliseconds.

Environment variable: KAFKASTREAMSPROCESSOR_RETRY_JITTER_DELAY_UNIT

nanos, micros, millis, seconds, minutes, hours, half-days, days, weeks, months, years, decades, centuries, millennia, eras, forever

millis

The list of exception types that should trigger a retry.

Default is the extension’s RetryableException

Environment variable: KAFKASTREAMSPROCESSOR_RETRY_RETRY_ON

list of string

io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException

The list of exception types that should not trigger a retry.

Default is empty list

Environment variable: KAFKASTREAMSPROCESSOR_RETRY_ABORT_ON

list of string