添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

This guide provides an in-depth look on Apache Kafka and SmallRye Reactive Messaging framework. For a quick start take a look at Getting Started to Quarkus Messaging with Apache Kafka .

You can add the messaging-kafka extension to your project by running the following command in your project base directory:

quarkus extension add messaging-kafka
Maven
./mvnw quarkus:add-extension -Dextensions='messaging-kafka'
Gradle
./gradlew addExtension --extensions='messaging-kafka'

这将在您的构建文件中添加以下内容:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-kafka")

应用程序发送和接收 messages 。一条消息包含一个 payload ,并可以用一些 metadata 进行扩展。通过Kafka connector,一条 message 对应于一条Kafka record

信息在 channels 上传输。应用程序组件通过连接 channels 来发布和消费消息。Kafka connector将 channels 映射到Kafka的 _topics_上 。

Channels 通过 connectors 连接到消息后端。Connectors通过配置将传入的消息映射到一个指定channel上(该channel由应用程序来消费),并对发送到指定channel的消息进行收集。每个connector都专用于某种特定的消息传递技术。例如,与Kafka交互的的connector被命名为 smallrye-kafka

请务必为生产环境配置broker地址。您可以在全局环境配置或使用 mp.messaging.incoming.$channel.bootstrap.servers 属性来针对特定channel配置。在开发模式和运行测试时, Kafka开发服务(Dev Services) 会自动启动一个Kafka broker。如果没有提供这个属性,则默认为 localhost:9092 。 Configure the connector to manage the prices channel. By default, the topic name is same as the channel name. You can configure the topic attribute to override it.
Connector auto-attachment

如果在你的classpath上有一个连接器,你可以省略 connector 属性配置。Quarkus会自动将 orphan 通道与classpath上找到的(唯一的)连接器联系起来。 Orphans 通道是没有下游消费者的传出通道或没有上游生产者的传入通道。

可以用以下方法禁用这种auto-attachment功能:

quarkus.messaging.auto-connector-attachment=false
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PriceConsumer {
    @Incoming("prices")
    public void consume(double price) {
        // process your price.

您的应用程序还可以通过另外集中方式来消费接收到的消息:

Message
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
    // access record metadata
    var metadata = msg.getMetadata(IncomingKafkaRecordMetadata.class).orElseThrow();
    // process the message payload.
    double price = msg.getPayload();
    // Acknowledge the incoming message (commit the offset)
    return msg.ack();

Message 类型允许consuming methond访问接收到消息的metadata并手动进行确认。我们将在 提交策略(Commit Strategies)中探讨不同的确认策略。

如果您想直接访问Kafka record对象,请使用:

ConsumerRecord
@Incoming("prices")
public void consume(ConsumerRecord<String, Double> record) {
    String key = record.key(); // Can be `null` if the incoming record has no key
    String value = record.value(); // Can be `null` if the incoming record has no value
    String topic = record.topic();
    int partition = record.partition();
    // ...

ConsumerRecord 由底层Kafka client提供,并且可以直接注入到consumer method中。另一种更简单的方法是使用 Record

Record
@Incoming("prices")
public void consume(Record<String, Double> record) {
    String key = record.key(); // Can be `null` if the incoming record has no key
    String value = record.value(); // Can be `null` if the incoming record has no value

Record 提供了对接收到的Kafka record中key和payload的简单的包装。

@Channel

另外,您的应用程序可以在您的Bean中注入一个 Multi ,然后像下面的例子那样订阅它的事件:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;
@Path("/prices")
public class PriceResource {
    @Inject
    @Channel("prices")
    Multi<Double> prices;
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<Double> stream() {
        return prices;

这个例子很好的展示了如何将Kafka consumer与另一个downstream进行集成。在这个例子中,我们将这个downstream暴露为一个服务端事件节点(Server-Sent Events endpoint)。

When consuming messages with @Channel, the application code is responsible for the subscription. In the example above, the Quarkus REST (formerly RESTEasy Reactive) endpoint handles that for you.

@Inject @Channel("prices") Multi<Double> streamOfPayloads;
@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;
@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;
@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;

如前面 Message 例子所示,如果您的注入channel接收到了playloads( Multi<T> ),它可以支持多订阅者自动确认消息。如果您的注入channel收到Message( Multi<Message<T>> ),那么您需要自行负责消息确认和广播。我们将在对多个消费者广播信息 中探讨消息的发送和广播。

4.1. 阻塞处理

Reactive Messaging会在一个I/O线程中调用您的方法。关于这个话题的更多细节,请看 Quarkus Reactive Architecture documentation 。但是您可能需要经常将Reactive Messaging 与阻塞式处理相结合使用,比如与数据库通信。为此,您需要使用 @Blocking 注解来表该明处理是 阻塞的 ,并且不在调用者线程中运行。

例如,下面的代码演示了如何使用Hibernate与Panache将接收到的payload存储到数据库:

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;
@ApplicationScoped
public class PriceStorage {
    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();

The complete example is available in the kafka-panache-quickstart directory.

它们效果相同。因此,您可以随意使用。第一个提供了更精细的配置,比如worker pool以及是否保留顺序。第二种,同其他的Quarkus Reactive功能类似,使用默认的worker pool并且保留了顺序。

Detailed information on the usage of @Blocking annotation can be found in SmallRye Reactive Messaging – Handling blocking execution.

@Incoming("prices")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void process(double price) {
    // process price

如果消费者方法接收到一个 Message ,那么确认策略是 Strategy.MANUAL 并且消费者方法将负责对消息进行ack/nack。

@Incoming("prices")
public CompletionStage<Void> process(Message<Double> msg) {
    // process price
    return msg.ack();

如上所述,该方法还可以将确认策略设置为 PRE_PROCESSINGNONE

4.3. 提交策略(Commit Strategies)

当一条由Kafka记录产生的消息被确认时,connector将会调用一个提交策略。这些策略决定了特定topic/分区(topic/partition)的消费者偏移将在何时被提交。提交一个偏移量(offset)表明所有之前的记录已经被处理了。它也是应用程序从崩溃中恢复后或重启后重新开始处理的位置。

由于Kafka的偏移量管理可能很慢,所以每次提交偏移量都会有性能上的损失。然而,如果程序在两次提交之间崩溃,不够频繁的偏移量提交可能会导致消息出现重复提交。

Kafka connector支持三种策略:

throttled keeps track of received messages and commits an offset of the latest acked message in sequence (meaning, all previous messages were also acked). This strategy guarantees at-least-once delivery even if the channel performs asynchronous processing. The connector tracks the received records and periodically (period specified by auto.commit.interval.ms, default: 5000 ms) commits the highest consecutive offset. The connector will be marked as unhealthy if a message associated with a record is not acknowledged in throttled.unprocessed-record-max-age.ms (default: 60000 ms). Indeed, this strategy cannot commit the offset as soon as a single record processing fails. If throttled.unprocessed-record-max-age.ms is set to less than or equal to 0, it does not perform any health check verification. Such a setting might lead to running out of memory if there are "poison pill" messages (that are never acked). This strategy is the default if enable.auto.commit is not explicitly set to true.

checkpoint allows persisting consumer offsets on a state store, instead of committing them back to the Kafka broker. Using the CheckpointMetadata API, consumer code can persist a processing state with the record offset to mark the progress of a consumer. When the processing continues from a previously persisted offset, it seeks the Kafka consumer to that offset and also restores the persisted state, continuing the stateful processing from where it left off. The checkpoint strategy holds locally the processing state associated with the latest offset, and persists it periodically to the state store (period specified by auto.commit.interval.ms (default: 5000)). The connector will be marked as unhealthy if no processing state is persisted to the state store in checkpoint.unsynced-state-max-age.ms (default: 10000). If checkpoint.unsynced-state-max-age.ms is set to less than or equal to 0, it does not perform any health check verification. For more information, see Stateful processing with Checkpointing

latest commits the record offset received by the Kafka consumer as soon as the associated message is acknowledged (if the offset is higher than the previously committed offset). This strategy provides at-least-once delivery if the channel processes the message without performing any asynchronous processing. Specifically, the offset of the most recent acknowledged message will always be committed, even if older messages have not finished being processed. In case of an incident such as a crash, processing would restart after the last commit, leading to older messages never being successfully and fully processed, which would appear as message loss. This strategy should not be used in high load environment, as offset commit is expensive. However, it reduces the risk of duplicates.

ignore 不执行任何提交。当消费者的 enable.auto.commit 属性被明确配置为true时,该策略将是默认策略。它将偏移量的提交委托给底层Kafka client负责。当 enable.auto.commit 为true的时候 ,该策略 保证至少会有一次提交。SmallRye Reactive Messaging是异步处理记录的,所以那些已经被轮询但尚未处理的record的偏移量有可能会被提交。如果提交失败,只有那些尚未被提交的record才会被重新处理。

dead-letter-queue.topic : 该topic用来保存未被正确处理的记录,默认为 dead-letter-topic-$channel$channel 是channel的名称。

dead-letter-queue.key.serializer 该序列化器用来对记录到dead letter queue中的record key进行序列化。默认情况下,该序列化器会从key的反序列化器反推出。

dead-letter-queue.value.serializer :该序列化器用来对记录到dead letter queue中的record value进行序列化。默认情况下,该序列化器会从value的反序列化器反推出。

Smallrye Reactive Messaging enables implementing custom failure strategies. See SmallRye Reactive Messaging documentation for more information.

4.4.1. 重试处理(Retrying processing)

您可以将Reactive Messaging与 SmallRye 容错结合起来,如果失败的话可以进行重试:

@Incoming("kafka")
@Retry(delay = 10, maxRetries = 5)
public void consume(String v) {
   // ... retry if this method throws an exception

您可以对延迟,重试次数以及抖动(jitter)等处理方式进行设置。

如果您的方法返回一个 UniCompletionStage ,您需要添加 @NonBlocking 注解:

@Incoming("kafka")
@Retry(delay = 10, maxRetries = 5)
@NonBlocking
public Uni<String> consume(String v) {
   // ... retry if this method throws an exception or the returned Uni produce a failure
@ApplicationScoped
@Identifier("failure-retry") // Set the name of the failure handler
public class MyDeserializationFailureHandler
    implements DeserializationFailureHandler<JsonObject> { // Specify the expected type
    @Override
    public JsonObject decorateDeserialization(Uni<JsonObject> deserialization, String topic, boolean isKey,
            String deserializer, byte[] data, Headers headers) {
        return deserialization
                    .onFailure().retry().atMost(3)
                    .await().atMost(Duration.ofMillis(200));

要使用这个故障处理的handler,Bean必须使用 @Identifier 限定符来暴露,并且connector配置必须指定属性 mp.messaging.incoming.$channel.[key|value]-deserialization-failure-handler (对于键或值的反序列化器)。

这个handler在被调用提供反序列化的细节,包括以 Uni<T> 所表示的操作(action)。在 Uni 提供的反序列化错误处理策略中,可以实现例如重试,提供回调(fallback)值或超时处理等等方式。

If you don’t configure a deserialization failure handler and a deserialization failure happens, the application is marked unhealthy. You can also ignore the failure, which will log the exception and produce a null value. To enable this behavior, set the mp.messaging.incoming.$channel.fail-on-deserialization-failure attribute to false.

If the fail-on-deserialization-failure attribute is set to false and the failure-strategy attribute is dead-letter-queue the failed record will be sent to the corresponding dead letter queue topic.

4.5. 消费者组(Consumer Groups)

在Kafka中,消费者组表示可以通过合作来消费来自于同一个topic的数据的一组消费者。 一个topic可以包含一组分区(partitions)。一个topic的分区会在组内的消费者之间分配,从而有效地提高消费的吞吐量。请注意,每个分区只会被分配给组内的一个消费者。但如果分区的数量大于组中消费者的数量, 那么一个消费者可以被分配多个分区。

让我们简单展示一下不同的生产者/消费者模式以及如何使用Quarkus来实现它们:

For a given application instance, the number of consumers inside the consumer group can be configured using mp.messaging.incoming.$channel.concurrency property. The partitions of the subscribed topic will be divided among the consumer threads. Note that if the concurrency value exceed the number of partitions of the topic, some consumer threads won’t be assigned any partitions.

The concurrency attribute provides a connector agnostic way for non-blocking concurrent channels and replaces the Kafka connector specific partitions attribute. The partitions attribute is therefore deprecated and will be removed in future releases.

A common business requirement is to consume and process Kafka records in order. The Kafka broker preserves order of records inside a partition and not inside a topic. Therefore, it is important to think about how records are partitioned inside a topic. The default partitioner uses record key hash to compute the partition for a record, or when the key is not defined, chooses a partition randomly per batch or records.

正常操作中,Kafka消费者会保留分配给它的每个分区里面的records的顺序。Smallrye Reactive Messaging会使用这个顺序进行处理,除非设置了 @Blocking(ordered = false) (参见阻塞处理 )。

请注意,由于消费者之间的再平衡(rebalances),Kafka消费者只保证对单一records的至少一次(at-least-once)处理,这意味着未提交的records 可以 被消费者再次处理。

4.5.1. 消费者再平衡监听器(Consumer Rebalance Listener)

在一个消费者组内,随着新老组员的交替,分区将会被重新分配,从而使每个组员都能分配到分区。这就是组的再平衡。为了处理偏移提交以及分区的分配,您可以提供一个消费者再平衡监听器。为了实现这一点,请实现 io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener 接口,并将其暴露为CDI bean并使用 @Idenfier 修饰符修饰。一个常见的用例是将偏移量存储在一个单独的数据存储中以使其保证语义上的精准一次(exactly-once semantic),或者在某一个特定的偏移量开始时处理。

监听器会在消费者的topic/分区分配发生变化时启动。例如,当应用程序启动时,它会调用 partitionsAssigned 回调并传入与消费者相关的初始topic/分区集合 。如果后来这个集合发生变化,它会再次调用 partitionsRevokedpartitionsAssigned 回调,所以您可以自行实现对应的逻辑。

请注意,再平衡(rebalance)监听器方法是在Kafka轮询线程中被调用的,并且 阻塞调用者线程直到完成。这是因为再平衡协议(rebalance protocol)有同步屏障,而在再平衡监听器中的异步代码可能会在同步屏障之后执行。

当topic/分区被从消费者那里分配或撤销时,它会暂停消息传递, 然后在重平衡完成后立即恢复。

如果使用再平衡监听器代替用户来处理偏移量提交(使用 NONE 提交策略),再平衡监听器就必须在partitionRevoked回调中同步提交偏移量。我们也建议在应用程序停止时使用同样的逻辑。

与Apache Kafka的 ConsumerRebalanceListener 不同, io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener 的方法会传递Kafka消费者和topic/分区集合。

In the following example we set up a consumer that always starts on messages from at most 10 minutes ago (or offset 0). First we need to provide a bean that implements io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener and is annotated with io.smallrye.common.annotation.Identifier. We then must configure our inbound connector to use this bean.

package inbound;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.TopicPartition;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
@ApplicationScoped
@Identifier("rebalanced-example.rebalancer")
public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRebalanceListener {
    private static final Logger LOGGER = Logger.getLogger(KafkaRebalancedConsumerRebalanceListener.class.getName());
     * When receiving a list of partitions, will search for the earliest offset within 10 minutes
     * and seek the consumer to it.
     * @param consumer   underlying consumer
     * @param partitions set of assigned topic partitions
    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        long now = System.currentTimeMillis();
        long shouldStartAt = now - 600_000L; //10 minute ago
        Map<TopicPartition, Long> request = new HashMap<>();
        for (TopicPartition partition : partitions) {
            LOGGER.info("Assigned " + partition);
            request.put(partition, shouldStartAt);
        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(request);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> position : offsets.entrySet()) {
            long target = position.getValue() == null ? 0L : position.getValue().offset();
            LOGGER.info("Seeking position " + target + " for " + position.getKey());
            consumer.seek(position.getKey(), target);
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class KafkaRebalancedConsumer {
    @Incoming("rebalanced-example")
    @Acknowledgment(Acknowledgment.Strategy.NONE)
    public CompletionStage<Void> consume(Message<ConsumerRecord<Integer, String>> message) {
        // We don't need to ACK messages because in this example,
        // we set offset during consumer rebalance
        return CompletableFuture.completedFuture(null);

如要配置inbound connector使用所提供的监听器,我们可以通过消费者再平衡监听器的标识符 mp.messaging.incoming.rebalanced-example.consumer-rebalance-listener.name=rebalanced-example.rebalancer 来设置

或者令监听器的名字与消费者组的ID相同:

mp.messaging.incoming.rebalanced-example.group.id=rebalanced-example.rebalancer

注意,设置消费者再平衡监听器的名称的方式要比使用组ID的方式优先被使用。

4.5.2. 使用单一的消费者组

如果您想处理一个topic中的所有记录(从其最开始时),您需要:

4.5.3. Manual topic-partition assignment

The assign-seek channel attribute allows manually assigning topic-partitions to a Kafka incoming channel, and optionally seek to a specified offset in the partition to start consuming records. If assign-seek is used, the consumer will not be dynamically subscribed to topics, but instead will statically assign the described partitions. In manual topic-partition rebalancing doesn’t happen and therefore rebalance listeners are never called.

The attribute takes a list of triplets separated by commas: <topic>:<partition>:<offset>.

For example, the configuration

mp.messaging.incoming.data.assign-seek=topic1:0:10, topic2:1:20

assigns the consumer to:

If the offset is omitted, partitions are assigned to the consumer but won’t be sought to offset.

If offset is 0, it seeks to the beginning of the topic-partition.

If offset is -1, it seeks to the end of the topic-partition.

4.6. 批量接收Kafka记录

默认情况下,接收方法会单独接收每条Kafka记录。在后台,Kafka消费者client会不断地轮询broker,并批量接收记录然后放入 ConsumerRecords 容器中。

批量 模式下,您的程序可以一次性接收消费者 轮询 返回的所有记录。

为了达到这一点,您需要指定一个兼容的容器类型来接收所有数据:

@Incoming("prices")
public void consume(List<Double> prices) {
    for (double price : prices) {
        // process price

The incoming method can also receive Message<List<Payload>>, Message<ConsumerRecords<Key, Payload>>, and ConsumerRecords<Key, Payload> types. They give access to record details such as offset or timestamp:

@Incoming("prices")
public CompletionStage<Void> consumeMessage(Message<ConsumerRecords<String, Double>> records) {
    for (ConsumerRecord<String, Double> record : records.getPayload()) {
        String payload = record.getPayload();
        String topic = record.getTopic();
        // process messages
    // ack will commit the latest offsets (per partition) of the batch.
    return records.ack();

注意,对于接收到的记录批次的成功处理会提交所收到批次内每个分区的最新偏移量。所配置的提交策略将只应用于这些记录。

反之,如果处理过程抛出一个异常,所有的消息都_不会被确认(nacked)_ ,并且对批次中的所有记录应用失败策略。

Smallrye Reactive Messaging checkpoint commit strategy allows consumer applications to process messages in a stateful manner, while also respecting Kafka consumer scalability. An incoming channel with checkpoint commit strategy persists consumer offsets on an external state store, such as a relational database or a key-value store. As a result of processing consumed records, the consumer application can accumulate an internal state for each topic-partition assigned to the Kafka consumer. This local state will be periodically persisted to the state store and will be associated with the offset of the record that produced it.

This strategy does not commit any offsets to the Kafka broker, so when new partitions get assigned to the consumer, i.e. consumer restarts or consumer group instances scale, the consumer resumes the processing from the latest checkpointed offset with its saved state.

The @Incoming channel consumer code can manipulate the processing state through the CheckpointMetadata API. For example, a consumer calculating the moving average of prices received on a Kafka topic would look the following:

package org.acme;
import java.util.concurrent.CompletionStage;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;
@ApplicationScoped
public class MeanCheckpointConsumer {
    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> record) {
        // Get the `CheckpointMetadata` from the incoming message
        CheckpointMetadata<AveragePrice> checkpoint = CheckpointMetadata.fromMessage(record);
        // `CheckpointMetadata` allows transforming the processing state
        // Applies the given function, starting from the value `0.0` when no previous state exists
        checkpoint.transform(new AveragePrice(), average -> average.update(record.getPayload()), /* persistOnAck */ true);
        // `persistOnAck` flag set to true, ack will persist the processing state
        // associated with the latest offset (per partition).
        return record.ack();
    static class AveragePrice {
        long count;
        double mean;
        AveragePrice update(double newPrice) {
            mean += ((newPrice - mean) / ++count);
            return this;

The transform method applies the transformation function to the current state, producing a changed state and registering it locally for checkpointing. By default, the local state is persisted to the state store periodically, period specified by auto.commit.interval.ms, (default: 5000). If persistOnAck flag is given, the latest state is persisted to the state store eagerly on message acknowledgment. The setNext method works similarly directly setting the latest state.

The checkpoint commit strategy tracks when a processing state is last persisted for each topic-partition. If an outstanding state change can not be persisted for checkpoint.unsynced-state-max-age.ms (default: 10000), the channel is marked unhealthy.

4.7.1. State stores

State store implementations determine where and how the processing states are persisted. This is configured by the mp.messaging.incoming.[channel-name].checkpoint.state-store property. The serialization of state objects depends on the state store implementation. In order to instruct state stores for serialization can require configuring the class name of state objects using mp.messaging.incoming.[channel-name].checkpoint.state-type property.

Quarkus provides following state store implementations:

quarkus-redis: Uses the quarkus-redis-client extension to persist processing states. Jackson is used to serialize processing state in Json. For complex objects it is required to configure the checkpoint.state-type property with the class name of the object. By default, the state store uses the default redis client, but if a named client is to be used, the client name can be specified using the mp.messaging.incoming.[channel-name].checkpoint.quarkus-redis.client-name property. Processing states will be stored in Redis using the key naming scheme [consumer-group-id]:[topic]:[partition].

mp.messaging.incoming.prices.group.id=prices-checkpoint
# ...
mp.messaging.incoming.prices.commit-strategy=checkpoint
mp.messaging.incoming.prices.checkpoint.state-store=quarkus-redis
mp.messaging.incoming.prices.checkpoint.state-type=org.acme.MeanCheckpointConsumer.AveragePrice
# ...
# if using a named redis client
mp.messaging.incoming.prices.checkpoint.quarkus-redis.client-name=my-redis
quarkus.redis.my-redis.hosts=redis://localhost:7000
quarkus.redis.my-redis.password=<redis-pwd>

quarkus-hibernate-reactive: Uses the quarkus-hibernate-reactive extension to persist processing states. Processing state objects are required to be a Jakarta Persistence entity and extend the CheckpointEntity class, which handles object identifiers composed of the consumer group id, topic and partition. Therefore, the class name of the entity needs to be configured using the checkpoint.state-type property.

mp.messaging.incoming.prices.group.id=prices-checkpoint
# ...
mp.messaging.incoming.prices.commit-strategy=checkpoint
mp.messaging.incoming.prices.checkpoint.state-store=quarkus-hibernate-reactive
mp.messaging.incoming.prices.checkpoint.state-type=org.acme.AveragePriceEntity

With AveragePriceEntity being a Jakarta Persistence entity extending CheckpointEntity:

package org.acme;
import jakarta.persistence.Entity;
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;
@Entity
public class AveragePriceEntity extends CheckpointEntity {
    public long count;
    public double mean;
    public AveragePriceEntity update(double newPrice) {
        mean += ((newPrice - mean) / ++count);
        return this;

quarkus-hibernate-orm: Uses the quarkus-hibernate-orm extension to persist processing states. It is similar to the previous state store, but it uses Hibernate ORM instead of Hibernate Reactive.

mp.messaging.incoming.prices.commit-strategy=checkpoint
mp.messaging.incoming.prices.checkpoint.state-store=quarkus-hibernate-orm
mp.messaging.incoming.prices.checkpoint.state-type=org.acme.AveragePriceEntity
mp.messaging.incoming.prices.checkpoint.quarkus-hibernate-orm.persistence-unit=prices
# ... Setup "prices" persistence unit
quarkus.datasource."prices".db-kind=postgresql
quarkus.datasource."prices".username=<your username>
quarkus.datasource."prices".password=<your password>
quarkus.datasource."prices".jdbc.url=jdbc:postgresql://localhost:5432/hibernate_orm_test
quarkus.hibernate-orm."prices".datasource=prices
quarkus.hibernate-orm."prices".packages=org.acme

For instructions on how to implement custom state stores, see Implementing State Stores.

%prod.kafka.bootstrap.servers=kafka:9092 (1)
mp.messaging.outgoing.prices-out.connector=smallrye-kafka (2)
mp.messaging.outgoing.prices-out.topic=prices (3)
请配置生产环境的broker位置。您可以在全局配置,或使用 mp.messaging.outgoing.$channel.bootstrap.servers 来针对特定channel配置它。在开发模式和运行测试时, Kafka开发服务(Dev Services)会自动启动一个Kafka broker。如果该属性未提供,它将默认为 localhost:9092 。 配置connector来管理 prices-out channel。 默认情况下,topic名称与channel名称相同。您可以配置topic属性来覆盖它。
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class KafkaPriceProducer {
    private final Random random = new Random();
    @Outgoing("prices-out")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());

请注意, generate 方法返回了一个 Multi<Double> ,它实现了Reactive Streams Publisher 接口。Quarkus框架会使用这个发布者生成消息,并将其发送到您配置的Kafka topic中。

为了发送键/值对, 您可以直接返回一个 io.smallrye.reactive.messaging.kafka.Record 来代理一个payload:

@Outgoing("out")
public Multi<Record<String, Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .map(x -> Record.of("my-key", random.nextDouble()));

Payload可以被封装在 org.eclipse.microprofile.reactive.messaging.Message,以便对写入的记录有更多的控制:

@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble())
                    .addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
                            .withKey("my-key")
                            .withTopic("my-key-prices")
                            .withHeaders(new RecordHeaders().add("my-header", "value".getBytes()))
                            .build()));

OutgoingKafkaRecordMetadata 允许您设置Kafka记录的元数据属性,如 keytopicpartitiontimestamp 。一种场景是动态地选择消息的目标topic。在这种情况下,您需要使用出站元数据(outgoing metadata)来设置topic名称,而不是在配置文件中配置topic。

除了返回Reactive Stream Publisher ( Multi 实现了 Publisher )的方法签名外,发送方法也可以返回单个消息。在这种情况下,生产者将使用该方法作为生成器来创建一个无限的流(infinite stream)。

@Outgoing("prices-out") T generate(); // T excluding void
@Outgoing("prices-out") Message<T> generate();
@Outgoing("prices-out") Uni<T> generate();
@Outgoing("prices-out") Uni<Message<T>> generate();
@Outgoing("prices-out") CompletionStage<T> generate();
@Outgoing("prices-out") CompletionStage<Message<T>> generate();

5.1. Sending messages with Emitter

有时,您需要使用命令式的方式来发送消息。

例如,如果您需要在REST节点内收到一个POST请求时向一个流发送消息。在这种情况下,您无法使用 @Outgoing ,因为您的方法有参数。

这种情况下您可以使用 Emitter

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.core.MediaType;
@Path("/prices")
public class PriceResource {
    @Inject
    @Channel("price-create")
    Emitter<Double> priceEmitter;
    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        CompletionStage<Void> ack = priceEmitter.send(price);

发送一个payload会返回一个 CompletionStage ,并且它会在消息被确认时完成。如果消息传输失败, CompletionStage 会以异常结束,并且包含未被确认的原因。

import java.util.concurrent.CompletableFuture;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.core.MediaType;
@Path("/prices")
public class PriceResource {
    @Inject @Channel("price-create") Emitter<Double> priceEmitter;
    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        priceEmitter.send(Message.of(price)
            .withAck(() -> {
                // Called when the message is acked
                return CompletableFuture.completedFuture(null);
            .withNack(throwable -> {
                // Called when the message is nacked
                return CompletableFuture.completedFuture(null);

如果您偏好于使用Reactive Stream APIs,您可以使用 MutinyEmitter ,它将在 send 方法中返回 Uni<Void> 。因此,您可以使用Mutiny APIs来处理下游的信息和错误。

import org.eclipse.microprofile.reactive.messaging.Channel;
import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.core.MediaType;
import io.smallrye.reactive.messaging.MutinyEmitter;
@Path("/prices")
public class PriceResource {
    @Inject
    @Channel("price-create")
    MutinyEmitter<Double> priceEmitter;
    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public Uni<String> addPrice(Double price) {
        return quoteRequestEmitter.send(price)
                .map(x -> "ok")
                .onFailure().recoverWithItem("ko");

也可以通过 sendAndAwait 方法在发送事件到emitter的时候进行阻塞。只有当事件被接收者确认或拒绝时,它才会从该方法中返回。

默认情况下,connector会等待Kafka确认记录以继续处理(确认收到的消息)。您可以通过将 waitForWriteCompletion 设置为 false 来禁用这个功能。

请注意, acks 属性对记录的确认有巨大影响。

如果一条记录无法写入,消息就会被拒绝。

5.3. 背压

Kafka的出站connector负责处理背压,并且会监测等待写入Kafka broker中的in-flight的消息数量。in-flight的消息的数量是通过 max-inflight-messages 配置的,默认为1024。

Connector只会并行发送指定数量的消息。在至少一个in-flight的消息被broker确认之前,其他消息都不会被发送。然后,当broker中有in-flight的消息得到确认时,connector才会向Kafka写入一个新的消息。请确保相应地配置Kafka的 batch.sizelinger.ms 属性。

您也可以通过将 max-inflight-messages 设置为 0 来取消in-flight消息的限制。但请注意,如果请求数量达到 max.in.flight.requests.per.connection 指定的值,Kafka生产者可能会阻塞。

5.4. 重试消息的发送

当Kafka生产者收到来自服务器的错误时,如果它是一个暂时的、可恢复的错误,那么客户端将重试发送这批消息。这种行为是由 retriesretry.backoff.ms 参数控制的。除此之外,SmallRye Reactive Messaging还会在可恢复的错误中重试单个消息,这取决于 retriesdelivery.timeout.ms 参数。

请注意,虽然在对一个可靠的系统来说拥有重试机制是一种最佳实践,但 max.in.flight.requests.per.connection 参数默认为 5 将会意味着消息的顺序不会被保证。如果消息的顺序对您来说是必须保证的,将 max.in.flight.requests.per.connection 设置为 1 将确保一次只发送一批消息,但代价是限制生产者的吞吐量。

关于如何对错误处理应用重试机制,请参见 [重试-处理] 一节。

5.5. 处理序列化失败

对于Kafka生产者客户端来说序列化失败是不可恢复的,因此消息发送不会被重试。在这些情况下,您可能需要为序列化器设置一个失败策略。为了实现这一点,您需要一个实现了 SerializationFailureHandler<T> 接口的bean:

@ApplicationScoped
@Identifier("failure-fallback") // Set the name of the failure handler
public class MySerializationFailureHandler
    implements SerializationFailureHandler<JsonObject> { // Specify the expected type
    @Override
    public byte[] decorateSerialization(Uni<byte[]> serialization, String topic, boolean isKey,
        String serializer, Object data, Headers headers) {
        return serialization
                    .onFailure().retry().atMost(3)
                    .await().indefinitely();

要使用该故障处理,Bean必须用 @Identifier 限定符修饰,并且connector的配置必须指定属性 mp.messaging.outgoing.$channel.[key|value]-serialization-failure-handler (对于键或值序列化器)。

处理器被调用,并被提供序列化的细节,包括以 Uni<byte[]> 表示的操作。注意,该方法必须对在结果处进行等待,并返回序列化后的字节数组。

5.6. 内存 channels

在某些情况下,使用消息模式在同一个应用程序内传输消息是很方便的。当您没有将channel连接到像Kafka这样的消息后端时,一切都会发生在内存中,并且流会通过链式方法创建。每个链式调用仍是一个响应式流,并执行背压策略。

Quarkus框架会验证生产者/消费者链是否完整,这意味着如果应用程序将消息写入内存channel(仅使用 @Outgoing 修饰符方法,或 Emitter ),它也必须从应用程序内部消费消息(仅 @Incoming 修饰符方法 ,或使用不受管理的流)。

5.7. 对多个消费者广播信息

默认情况下,一个channel可以关联到一个单一的消费者上,通过使用 @Incoming 方法或 @Channel 响应式应式流。在程序启动时,channels 会被验证,以形成一个由单个消费者和生产者组成的消费者和生产者链。您可以通过在channel上设置 mp.messaging.$channel.broadcast=true 来覆盖这种行为。

在内存 channels 的情况下, @Broadcast 注释可以用在 @Outgoing 方法上。比如说,

import java.util.Random;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.reactive.messaging.annotations.Broadcast;
@ApplicationScoped
public class MultipleConsumer {
    private final Random random = new Random();
    @Outgoing("in-memory-channel")
    @Broadcast
    double generate() {
        return random.nextDouble();
    @Incoming("in-memory-channel")
    void consumeAndLog(double price) {
        System.out.println(price);
    @Incoming("in-memory-channel")
    @Outgoing("prices2")
    double consumeAndSend(double price) {
        return price;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class MultipleProducers {
    private final Random random = new Random();
    @Outgoing("generated")
    @Outgoing("generated-2")
    double priceBroadcast() {
        return random.nextDouble();

In the previous example generated price will be broadcast to both outbound channels. The following example selectively sends messages to multiple outgoing channels using the Targeted container object, containing key as channel name and value as message payload.

import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.reactive.messaging.Targeted;
@ApplicationScoped
public class TargetedProducers {
    @Incoming("in")
    @Outgoing("out1")
    @Outgoing("out2")
    @Outgoing("out3")
    public Targeted process(double price) {
        Targeted targeted = Targeted.of("out1", "Price: " + price,
                "out2", "Quote: " + price);
        if (price > 90.0) {
            return targeted.with("out3", price);
        return targeted;

Note that the auto-detection for Kafka serializers doesn’t work for signatures using the Targeted.

For more details on using multiple outgoings, please refer to the SmallRye Reactive Messaging documentation.

5.8. Kafka事务处理

Kafka 事务支持对多个 Kafka 主题和分区进行原子写入。 Kafka 连接器提供了 KafkaTransactions 自定义emitter,用于在事务中写入 Kafka 记录。 它可以作为常规emitter @Channel 注入:

import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Channel;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
@ApplicationScoped
public class KafkaTransactionalProducer {
    @Channel("tx-out-example")
    KafkaTransactions<String> txProducer;
    public Uni<Void> emitInTransaction() {
        return txProducer.withTransaction(emitter -> {
            emitter.send(KafkaRecord.of(1, "a"));
            emitter.send(KafkaRecord.of(2, "b"));
            emitter.send(KafkaRecord.of(3, "c"));
            return Uni.createFrom().voidItem();

传入 withTransaction 方法的函数参数会使用 TransactionalEmitter 来产生记录,并返回 Uni 做为事务结果。

Kafka 事务生产者需要配置 acks=all 客户端属性,以及 transactional.id 的唯一 id,这意味着 enable.idempotence=true 。 当 Quarkus 检测到传出通道使用 KafkaTransactions 时,它会在通道上配置这些属性,为 transactional.id 属性提供默认值 "${quarkus.application.name}-${channelName}"

请注意,要在生产环境使用,必须确保 transactional.id 在所有应用实例中是唯一的。

虽然普通的消息emitter支持并发调用 send 方法并将要写入 Kafka 的传出消息顺序排队,但 KafkaTransactions emitter每次只支持一个事务。 从调用 withTransaction 直到返回的 Uni 导致成功或失败,事务被视为正在进行中。 当事务正在进行时,对 withTransaction 的后续调用,包括给定函数内的嵌套调用,都将抛出 IllegalStateException

Note that in Reactive Messaging, the execution of processing methods, is already serialized, unless @Blocking(ordered = false) is used. If withTransaction can be called concurrently, for example from a REST endpoint, it is recommended to limit the concurrency of the execution. This can be done using the @Bulkhead annotation from Microprofile Fault Tolerance.

示例用法可以在 Chaining Kafka Transactions with Hibernate Reactive transactions 中找到。

The Kafka Request-Reply pattern allows to publish a request record to a Kafka topic and then await for a reply record that responds to the initial request. The Kafka connector provides the KafkaRequestReply custom emitter that implements the requestor (or the client) of the request-reply pattern for Kafka outbound channels:

It can be injected as a regular emitter @Channel:

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.microprofile.reactive.messaging.Channel;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply;
@ApplicationScoped
@Path("/kafka")
public class KafkaRequestReplyEmitter {
    @Channel("request-reply")
    KafkaRequestReply<Integer, String> requestReply;
    @POST
    @Path("/req-rep")
    @Produces(MediaType.TEXT_PLAIN)
    public Uni<String> post(Integer request) {
        return requestReply.request(request);

The request method publishes the record to the configured target topic of the outgoing channel, and polls a reply topic (by default, the target topic with -replies suffix) for a reply record. When the reply is received the returned Uni is completed with the record value. The request send operation generates a correlation id and sets a header (by default REPLY_CORRELATION_ID), which it expects to be sent back in the reply record.

The replier can be implemented using a Reactive Messaging processor (see 处理消息).

For more information on Kafka Request Reply feature and advanced configuration options, see the Smallrye Reactive Messaging Documentation.

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PriceProcessor {
    private static final double CONVERSION_RATE = 0.88;
    @Incoming("price-in")
    @Outgoing("price-out")
    public double process(double price) {
        return price * CONVERSION_RATE;

process 方法的参数是传入的消息的payload,而返回值将被用作传出的消息的payload。之前提到的参数和返回类型的签名也被支持,如 Message<T>Record<K, V> 等等。

您可以通过消费和返回响应式流 Multi<T> 类型来应用异步流处理:

import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.mutiny.Multi;
@ApplicationScoped
public class PriceProcessor {
    private static final double CONVERSION_RATE = 0.88;
    @Incoming("price-in")
    @Outgoing("price-out")
    public Multi<Double> process(Multi<Integer> prices) {
        return prices.filter(p -> p > 100).map(p -> p * CONVERSION_RATE);

7.1. 传播记录键

在处理信息时,您可以将传入的记录键发送给传出的记录。

通过启用 mp.messaging.outgoing.$channel.propagate-record-key=true ,记录键传播可以产生与传入记录的 相同的传出记录。

如果传出的记录已经包含一个 ,那么它 不会 被传入的记录键的键所覆盖。如果传入的记录键为_空_,那么 mp.messaging.outgoing.$channel.key 设置的值会被使用。

7.2. (Exactly-Once Processing)精确一次处理

Kafka 事务可以同时管理其中的消费端和生产端的消息偏移量。 这使得消费端能够以 consume-transform-produce 模式与生产端耦合,也称为 exactly-once 精确一次性处理

KafkaTransactions 定制emitter可以提供一种对事务中传入的 Kafka 消息进行一次性处理的方法。

以下示例包括在事务中进行 Kafka 记录的批处理。

import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
@ApplicationScoped
public class KafkaExactlyOnceProcessor {
    @Channel("prices-out")
    @OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 500) (3)
    KafkaTransactions<Integer> txProducer;
    @Incoming("prices-in")
    public Uni<Void> emitInTransaction(Message<ConsumerRecords<String, Integer>> batch) { (1)
        return txProducer.withTransactionAndAck(batch, emitter -> { (2)
            for (ConsumerRecord<String, Integer> record : batch.getPayload()) {
                emitter.send(KafkaRecord.of(record.key(), record.value() + 1)); (3)
            return Uni.createFrom().voidItem();
The consumed message is passed to the KafkaTransactions#withTransactionAndAck in order to handle the offset commits and message acks.
send 方法将记录通过事务写入 Kafka,而无需等待来自代理的发送回执。 等待写入 Kafka 的消息将被缓冲,并在提交事务之前刷新。 因此,建议配置 @OnOverflow bufferSize 以适应足够的消息,例如 max.poll.records,即批处理中返回的最大记录数。

如果处理成功完成,在提交事务之前 ,给定批处理消息的主题分区偏移量将提交给事务。

如果处理需要中止,在中止事务后 ,消费者的位置将重置为最后提交的偏移量,能有效地从该偏移量恢复消费。 如果没有消费者偏移量被提交到主题分区,消费者的位置将被重置到主题分区的开头,即使把 latest 做为偏移量重置策略

7.2.1. (exactly-once)精确一次处理的错误处理

如果事务失败并被中止,则从 KafkaTransactions#withTransaction 返回的 Uni 将产生失败。 应用程序可以选择处理错误情况,但如果从 @Incoming 方法返回失败的 Uni ,则传入通道将有效地失败并停止响应流。

KafkaTransactions#withTransactionAndAck 方法确认和确认消息,但 不会 返回失败的 Uni 。 Nacked 消息将由传入通道的故障策略处理,(参见错误处理策略(Error Handling Strategies))。 配置 failure-strategy=ignore 只是将 Kafka 消费者重置为最后提交的偏移量并从那里恢复消费。

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.kafka.clients.producer.ProducerRecord;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.reactive.messaging.kafka.KafkaClientService;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
@ApplicationScoped
public class PriceSender {
    @Inject
    KafkaClientService clientService;
    void onStartup(@Observes StartupEvent startupEvent) {
        KafkaProducer<String, Double> producer = clientService.getProducer("generated-price");
        producer.runOnSendingThread(client -> client.send(new ProducerRecord<>("prices", 2.4)))
            .await().indefinitely();
import io.smallrye.common.annotation.Identifier;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import java.util.HashMap;
import java.util.Map;
@ApplicationScoped
public class KafkaClients {
    @Inject
    @Identifier("default-kafka-broker")
    Map<String, Object> config;
    @Produces
    AdminClient getAdmin() {
        Map<String, Object> copy = new HashMap<>();
        for (Map.Entry<String, Object> entry : config.entrySet()) {
            if (AdminClientConfig.configNames().contains(entry.getKey())) {
                copy.put(entry.getKey(), entry.getValue());
        return KafkaAdminClient.create(copy);

default-kafka-broker 配置map包含所有以 kafka.KAFKA_ 为前缀的应用属性。更多的配置选项,请查看 [kafka配置方案] 。

import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
* A bean consuming data from the "fruit-in" channel and applying some price conversion.
* The result is pushed to the "fruit-out" channel.
@ApplicationScoped
public class FruitProcessor {
    private static final double CONVERSION_RATE = 0.88;
    @Incoming("fruit-in")
    @Outgoing("fruit-out")
    @Broadcast
    public Fruit process(Fruit fruit) {
        fruit.price = fruit.price * CONVERSION_RATE;
        return fruit;

To do this, we will need to set up JSON serialization with Jackson or JSON-B.

9.1. 通过Jackson进行序列化

Quarkus has built-in support for JSON serialization and deserialization based on Jackson. It will also generate the serializer and deserializer for you, so you do not have to configure anything. When generation is disabled, you can use the provided ObjectMapperSerializer and ObjectMapperDeserializer as explained below.

有一个现有的 ObjectMapperSerializer ,可以用来通过Jackson来序列化所有的数据对象。如果您想使用 [自动侦测序列化] ,您可以创建一个空的子类来继承该类。

package com.acme.fruit.jackson;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);

最后,配置您的 channelss 以使用Jackson序列化器和反序列化器。

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jackson.FruitDeserializer
# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

现在,您的Kafka消息将包含 Fruit 数据对象的Jackson序列化格式。在这种情况下,deserializer 的配置不是必须的,因为 [序列化自动侦测] 是默认启用的。

如果您想反序列化一个fruit对象列表,您需要创建一个反序列化器,它会用Jackson TypeReference 表示所用到的通用集合。

package com.acme.fruit.jackson;
import java.util.List;
import com.fasterxml.jackson.core.type.TypeReference;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
public class ListOfFruitDeserializer extends ObjectMapperDeserializer<List<Fruit>> {
    public ListOfFruitDeserializer() {
        super(new TypeReference<List<Fruit>>() {});
package com.acme.fruit.jsonb;
import io.quarkus.kafka.client.serialization.JsonbDeserializer;
public class FruitDeserializer extends JsonbDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);

最后,通过配置来使您的 channels 使用JSON-B串行器和反串行器。

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.connector=smallrye-kafka
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jsonb.FruitDeserializer
# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.connector=smallrye-kafka
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

现在,您的Kafka消息将包含 Fruit 数据对象的JSON-B序列化格式。

如果您想反序列化一个fruit对象列表,您需要创建一个反序列化器,它会用一个 Type 表示所用到的通用集合。

package com.acme.fruit.jsonb;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import io.quarkus.kafka.client.serialization.JsonbDeserializer;
public class ListOfFruitDeserializer extends JsonbDeserializer<List<Fruit>> {
    public ListOfFruitDeserializer() {
        super(new ArrayList<MyEntity>() {}.getClass().getGenericSuperclass());

When using Quarkus Messaging with Kafka (io.quarkus:quarkus-messaging-kafka), Quarkus can often automatically detect the correct serializer and deserializer class. This autodetection is based on declarations of @Incoming and @Outgoing methods, as well as injected @Channels.

例如,如果您声明

@Outgoing("generated-price")
public Multi<Integer> generate() {

而您的配置表明 generated-price channel 使用了 smallrye-kafka 连接器,那么Quarkus会自动将 value.serializer 设置为Kafka内置的 IntegerSerializer

同样地,如果您声明

@Incoming("my-kafka-records")
public void consume(Record<Long, byte[]> record) {

并且您的配置表明 my-kafka-records channel 使用了 smallrye-kafka 连接器,那么Quarkus会自动将 key.deserializer 设置为Kafka内置的 LongDeserializer ,以及 value.deserializer 设置为 ByteArrayDeserializer

最后,如果您声明

@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;

而您的配置表明 price-create channel 使用 smallrye-kafka 连接器,那么Quarkus将自动将 value.serializer 设置为Kafka内置的 DoubleSerializer

序列化器/反序列化器自动侦测所支持的全部类型有:

In case you have any issues with serializer autodetection, you can switch it off completely by setting quarkus.messaging.kafka.serializer-autodetection.enabled=false. If you find you need to do this, please file a bug in the Quarkus issue tracker so we can fix whatever problem you have.

This is described in a dedicated guide for Avro: Using Apache Kafka with Schema Registry and Avro. And a different one for JSON Schema: Using Apache Kafka with Schema Registry and JSON Schema.

# Disable both liveness and readiness checks with `health-enabled=false`:
# Incoming channel (receiving records form Kafka)
mp.messaging.incoming.your-channel.health-enabled=false
# Outgoing channel (writing records to Kafka)
mp.messaging.outgoing.your-channel.health-enabled=false
# Disable only the readiness check with `health-readiness-enabled=false`:
mp.messaging.incoming.your-channel.health-readiness-enabled=false
mp.messaging.outgoing.your-channel.health-readiness-enabled=false

使用 health-topic-verification-enabled=true 属性, 启动 探针使用一个 管理客户端 来检查topic列表。而传入 channel 的 就绪 探针将检查是否至少有一个分区被分配用于消费,而传出 channel 则检查生产者使用的topic是否存在于broker中。

注意,要实现这一点, 一个_管理员连接_是必须存在的 。您可以使用 health-topic-verification-timeout 来调整对broker的topic验证调用的超时时间。

If the OpenTelemetry extension is present, then the Kafka connector channels work out-of-the-box with the OpenTelemetry Tracing. Messages written to Kafka topics propagate the current tracing span. On incoming channels, if a consumed Kafka record contains tracing information the message processing inherits the message span as parent.

Tracing can be disabled explicitly per channel:

mp.messaging.incoming.data.tracing-enabled=false

If the Micrometer extension is present, then Kafka producer and consumer clients metrics are exposed as Micrometer meters.

16.1. Channel metrics

Per channel metrics can also be gathered and exposed as Micrometer meters. Following metrics can be gathered per channel, identified with the channel tag:

The message observation depends on intercepting messages and therefore doesn’t support channels consuming messages with a custom message type such as IncomingKafkaRecord, KafkaRecord, IncomingKafkaRecordBatch or KafkaRecordBatch.

The message interception, and observation, still work with channels consuming the generic Message type, or custom payloads enabled by converters.

In JVM mode, it will work out of the box. However, to compile your application to a native executable, you need to add quarkus.kafka.snappy.enabled=true to your application.properties.

在原生模式下,Snappy默认是禁用的,因为使用Snappy需要嵌入一个原生库,并在应用程序启动时对其进行解包。

<artifactId>kafka-oauth-client</artifactId> </dependency> <!-- if compiling to native you'd need also the following dependency --> <dependency> <groupId>io.strimzi</groupId> <artifactId>kafka-oauth-common</artifactId> </dependency>
build.gradle
implementation("io.strimzi:kafka-oauth-client")
// if compiling to native you'd need also the following dependency
implementation("io.strimzi:kafka-oauth-common")

这个依赖提供了处理OAuth工作流所需的回调处理器。然后,在 application.properties ,添加:

mp.messaging.connector.smallrye-kafka.security.protocol=SASL_PLAINTEXT
mp.messaging.connector.smallrye-kafka.sasl.mechanism=OAUTHBEARER
mp.messaging.connector.smallrye-kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.client.id="team-a-client" \
  oauth.client.secret="team-a-client-secret" \
  oauth.token.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token" ;
mp.messaging.connector.smallrye-kafka.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
quarkus.ssl.native=true

更改 oauth.client.idoauth.client.secretoauth.token.endpoint.uri 值。

OAuth认证在JVM和原生模式下都有效。由于SSL在原生模式下默认不启用,所以必须添加 quarkus.ssl.native=true ,以支持JaasClientOauthLoginCallbackHandler。它使用了SSL。(更多细节请参见《 在原生可执行文件中使用SSL 》指南)。

@Outgoing("beverages") Beverage process(Order order) { System.out.println("Order received " + order.getProduct()); Beverage beverage = new Beverage(); beverage.setBeverage(order.getProduct()); beverage.setCustomer(order.getCustomer()); beverage.setOrderId(order.getOrderId()); beverage.setPreparationState("RECEIVED"); return beverage;

首先,在您的应用程序中添加以下测试依赖:

pom.xml
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-reactive-messaging-in-memory</artifactId>
    <scope>test</scope>
</dependency>
build.gradle
testImplementation("io.smallrye.reactive:smallrye-reactive-messaging-in-memory")

然后,按以下方法创建Quarkus测试资源:

public class KafkaTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager {
    @Override
    public Map<String, String> start() {
        Map<String, String> env = new HashMap<>();
        Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("orders");     (1)
        Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("beverages");  (2)
        env.putAll(props1);
        env.putAll(props2);
        return env;  (3)
    @Override
    public void stop() {
        InMemoryConnector.clear();  (4)
@QuarkusTest
@QuarkusTestResource(KafkaTestResourceLifecycleManager.class)
class BaristaTest {
    @Inject
    @Connector("smallrye-in-memory")
    InMemoryConnector connector; (1)
    @Test
    void testProcessOrder() {
        InMemorySource<Order> ordersIn = connector.source("orders");     (2)
        InMemorySink<Beverage> beveragesOut = connector.sink("beverages");  (3)
        Order order = new Order();
        order.setProduct("coffee");
        order.setName("Coffee lover");
        order.setOrderId("1234");
        ordersIn.send(order);  (4)
        await().<List<? extends Message<Beverage>>>until(beveragesOut::received, t -> t.size() == 1); (5)
        Beverage queuedBeverage = beveragesOut.received().get(0).getPayload();
        Assertions.assertEquals(Beverage.State.READY, queuedBeverage.getPreparationState());
        Assertions.assertEquals("coffee", queuedBeverage.getBeverage());
        Assertions.assertEquals("Coffee lover", queuedBeverage.getCustomer());
        Assertions.assertEquals("1234", queuedBeverage.getOrderId());

If your Kafka consumer is batch based, you will need to send a batch of messages to the channel as by creating them manually.

@ApplicationScoped
public class BeverageProcessor {
    @Incoming("orders")
    CompletionStage<Void> process(KafkaRecordBatch<String, Order> orders) {
        System.out.println("Order received " + orders.getPayload().size());
        return orders.ack();
@QuarkusTest
@QuarkusTestResource(KafkaTestResourceLifecycleManager.class)
class BaristaTest {
    @Inject
    @Connector("smallrye-in-memory")
    InMemoryConnector connector;
    @Test
    void testProcessOrder() {
        InMemorySource<IncomingKafkaRecordBatch<String, Order>> ordersIn = connector.source("orders");
        var committed = new AtomicBoolean(false);  (1)
        var commitHandler = new KafkaCommitHandler() {
            @Override
            public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record) {
                committed.set(true);  (2)
                return null;
        var failureHandler = new KafkaFailureHandler() {
            @Override
            public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reason, Metadata metadata) {
                return null;
        Order order = new Order();
        order.setProduct("coffee");
        order.setName("Coffee lover");
        order.setOrderId("1234");
        var record = new ConsumerRecord<>("topic", 0, 0, "key", order);
        var records = new ConsumerRecords<>(Map.of(new TopicPartition("topic", 1), List.of(record)));
        var batch = new IncomingKafkaRecordBatch<>(
            records, "kafka", 0, commitHandler, failureHandler, false, false);  (3)
        ordersIn.send(batch);
        await().until(committed::get);  (4)

20.1.1. Context propagation with InMemoryConnector

By default, in-memory channels dispatch messages on the caller thread, which would be the main thread in unit tests.

The quarkus-test-vertx dependency provides the @io.quarkus.test.vertx.RunOnVertxContext annotation, which when used on a test method, executes the test on a Vert.x context.

However, most of the other connectors handle context propagation dispatching messages on separate duplicated Vert.x contexts.

If your tests are dependent on context propagation, you can configure the in-memory connector channels with the run-on-vertx-context attribute to dispatch events, including messages and acknowledgements, on a Vert.x context. Alternatively you can switch this behaviour using the InMemorySource#runOnVertxContext method.

20.2. 使用Kafka broker的测试

如果您使用 [kafka-dev-services] ,Kafka broker将被启动并在整个测试中可用,除非它在 %test profile中被禁用。虽然可以使用Kafka客户端API连接到这个broker,但 Kafka Companion Library 提出了一种更简单的方式来与Kafka broker通信,并在测试中创建消费者、生产者和管理操作。

为了在测试中使用 KafkaCompanion API,首先要添加以下依赖:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-test-kafka-companion</artifactId>
    <scope>test</scope>
</dependency>

它提供了 io.quarkus.test.kafka.KafkaCompanionResource — io.quarkus.test.common.QuarkusTestResourceLifecycleManager 的一种实现。

然后使用 @QuarkusTestResource 在测试中配置Kafka Companion,比如:

import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.UUID;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.kafka.InjectKafkaCompanion;
import io.quarkus.test.kafka.KafkaCompanionResource;
import io.smallrye.reactive.messaging.kafka.companion.ConsumerTask;
import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion;
@QuarkusTest
@QuarkusTestResource(KafkaCompanionResource.class)
public class OrderProcessorTest {
    @InjectKafkaCompanion (1)
    KafkaCompanion companion;
    @Test
    void testProcessor() {
        companion.produceStrings().usingGenerator(i -> new ProducerRecord<>("orders", UUID.randomUUID().toString())); (2)
        // Expect that the tested application processes orders from 'orders' topic and write to 'orders-processed' topic
        ConsumerTask<String, String> orders = companion.consumeStrings().fromTopics("orders-processed", 10); (3)
        orders.awaitCompletion(); (4)
        assertEquals(10, orders.count());

如果Kafka Dev Service在测试期间是可用的, KafkaCompanionResource 则会使用创建的Kafka broker,否则就使用 Strimzi测试容器创建一个Kafka broker。

创建Kafka broker的配置可以通过使用 @ResourceArg 来自定义,例如:

@QuarkusTestResource(value = KafkaCompanionResource.class, initArgs = {
        @ResourceArg(name = "strimzi.kafka.image", value = "quay.io/strimzi-test-container/test-container:0.106.0-kafka-3.7.0"), // Image name
        @ResourceArg(name = "kafka.port", value = "9092"), // Fixed port for kafka, by default it will be exposed on a random port
        @ResourceArg(name = "kraft", value = "true"), // Enable Kraft mode
        @ResourceArg(name = "num.partitions", value = "3"), // Other custom broker configurations
public class OrderProcessorTest {
    // ...
public class KafkaResource implements QuarkusTestResourceLifecycleManager {
    private final KafkaContainer kafka = new KafkaContainer();
    @Override
    public Map<String, String> start() {
        kafka.start();
        return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers());  (1)
    @Override
    public void stop() {
        kafka.close();

If any Kafka-related extension is present (e.g. quarkus-messaging-kafka), Dev Services for Kafka automatically starts a Kafka broker in dev mode and when running tests. So, you don’t have to start a broker manually. The application is configured automatically.

如果您需要多个(共享的)broker,您可以配置 quarkus.kafka.devservices.service-name 属性并指明broker的名称。它会查询一个具有相同名称的容器,如果找不到的话就启动一个新的容器。默认的服务名称是 kafka

在开发模式下,共享是默认启用的,但在测试模式下是禁用的。您可以用 quarkus.kafka.devservices.shared=false 停用共享。

21.3. 设置端口

默认情况下,Kafka开发服务会随机挑选一个端口并配置应用程序。您可以通过配置 quarkus.kafka.devservices.port 属性来设置端口。

注意,Kafka的广告地址(advertised address)会自动配置为所选择的端口。

21.4. 配置镜像

Dev Services for Kafka supports Redpanda, kafka-native and Strimzi (in Kraft mode) images.

Redpanda is a Kafka compatible event streaming platform. Because it provides a fast startup times, dev services defaults to Redpanda images from vectorized/redpanda. You can select any version from https://hub.docker.com/r/vectorized/redpanda.

kafka-native provides images of standard Apache Kafka distribution compiled to native binary using Quarkus and GraalVM. While still being experimental, it provides very fast startup times with small footprint.

Image type can be configured using

quarkus.kafka.devservices.provider=kafka-native

Strimzi provides container images and Operators for running Apache Kafka on Kubernetes. While Strimzi is optimized for Kubernetes, the images work perfectly in classic container environments. Strimzi container images run "genuine" Kafka broker on JVM, which is slower to start.

quarkus.kafka.devservices.provider=strimzi

对于Strimzi,您可以从 https://quay.io/repository/strimzi-test-container/test-container?tab=tags ,选择任何可以获得Kraft支持的Kafka版本(2.8.1及以上)的镜像

quarkus.kafka.devservices.image-name=quay.io/strimzi-test-container/test-container:0.106.0-kafka-3.7.0

21.6. Transactional and Idempotent producers support

By default, the Redpanda broker is configured to enable transactions and idempotence features. You can disable those using:

quarkus.kafka.devservices.redpanda.transaction-enabled=false

If any Kafka-related extension is present (e.g. quarkus-messaging-kafka), the Quarkus Dev UI is extended with a Kafka broker management UI. It is connected automatically to the Kafka broker configured for the application.

响应式流会在I/O线程上调用用户的方法。因此在默认情况下,这些方法不能阻塞。正如 [阻塞处理] 中所述,如果这个方法会阻塞调用者线程,那么您需要在方法上添加 @Blocking 注解。

关于这个话题的更多细节,请看 Quarkus响应式架构文档

如果没有设置, key.deserializer 则会被设置为 org.apache.kafka.common.serialization.StringDeserializer

消费者 client.id 会根据使用 mp.messaging.incoming.[channel].partitions 属性创建的客户端数量进行配置。

health-readiness-topic-verification

已废弃 - 就绪检查是否应该验证broker上是否存在主题。默认为false。启用它需要一个管理员连接。已废弃:请使用 'health-topic-verification-enabled' 代替。

类型: boolean

false

health-readiness-timeout

已废弃 - 在就绪状态健康检查期间,连接器会连接到broker并获取主题列表。这个属性指定了获取操作的最大耗时(ms)。如果超时,通道会被认为是没有就绪的。已废弃:使用’health-topic-verification-timeout’代替。

类型: long

false

health-topic-verification-enabled

启动和就绪检查是否要验证broker上存在主题。默认为false。启用它需要一个管理员客户端连接。

类型: boolean

false

false

health-topic-verification-timeout

在启动和就绪的健康检查期间,连接器会连接到broker并获取主题列表。这个属性指定了获取的最大耗时(ms)。如果超时,通道就被认为是没有就绪的。

类型: long

false

tracing-enabled

是否启用(默认启用)或禁用tracing

类型: boolean

false

client-id-prefix

Prefix for Kafka client client.id attribute. If defined configured or generated client.id will be prefixed with the given value, otherwise kafka-consumer- is the prefix.

Type: string

false

checkpoint.state-store

While using the checkpoint commit-strategy, the name set in @Identifier of a bean that implements io.smallrye.reactive.messaging.kafka.StateStore.Factory to specify the state store implementation.

Type: string

false

checkpoint.state-type

While using the checkpoint commit-strategy, the fully qualified type name of the state object to persist in the state store. When provided, it can be used by the state store implementation to help persisting the processing state object.

Type: string

false

checkpoint.unsynced-state-max-age.ms

While using the checkpoint commit-strategy, specify the max age in milliseconds that the processing state must be persisted before the connector is marked as unhealthy. Setting this attribute to 0 disables this monitoring.

Type: int

false

10000

cloud-events

启用(默认启用)或停用云事件支持。如果在 传入 通道上启用,连接器会分析传入记录并尝试创建云事件元数据。如果在 传出 通道上启用,并且如果消息包括云事件元数据,连接器会将传出的消息作为云事件发送。

类型: boolean

false

kafka-configuration

为通道提供默认Kafka消费者/生产者配置的CDI bean的标识符。通道自身的配置仍可以覆盖任何属性。该bean必须有一个Map<String, Object>的类型,并且必须使用@io.smallrye.common.annotation.Identifier限定符来设置标识符。

类型: string

false

topics

A comma-separating list of topics to be consumed. Cannot be used with the topic or pattern properties

Type: string

false

pattern

表示 topic 属性是一个正则表达式。必须与 topic 属性一起使用。不能与 topics 属性一起使用

类型: boolean

false

false

key.deserializer

用于反序列化记录的键值的反序列化器的类名

类型: string

false

org.apache.kafka.common.serialization.StringDeserializer

lazy-client

Whether Kafka client is created lazily or eagerly.

Type: boolean

false

false

value.deserializer

用于反序列化记录的值的反序列化器的类名

类型: string

fetch.min.bytes

针对一个获取请求,服务器所应该返回的最小数据量。默认设置为1个字节,意味着在等待数据到达的同时,一旦有一个字节的数据可用,那么获取请求就会被响应,或者直至获取请求超时。

类型: int

false

group.id

一个唯一的字符串,用于识别应用程序所属的消费者组。

如果未设置, 那么默认为通过 quarkus.application.name 所设定的应用程序名称。

如果这个属性也没有设置,那么会使用一个生成的唯一的id。

建议总是设置一个 group.id,因为自动生成只是一个方便在开发模式中使用的特性。 你可以通过将该属性设置为 ${quarkus.uuid} 来显示的获取自动生成的唯一的id。

类型: string

false

enable.auto.commit

如果启用,消费者的偏移量将由底层Kafka客户端在后台定期提交,并忽略记录的实际处理结果。建议不要启用这个设置,而是由Reactive Messaging来处理提交。

类型: boolean

false

false

retry

Whether the connection to the broker is re-attempted in case of failure

Type: boolean

false

retry-attempts

失败前的最大重连次数。-1表示无限重试

类型: int

false

retry-max-wait

两次重新连接之间的最大延迟(秒)

类型: int

false

broadcast

Whether the Kafka records should be dispatched to multiple consumer

Type: boolean

false

false

auto.offset.reset

当Kafka没有初始偏移量时的处理策略。可接受的值是earliest, latest 和 none

类型: string

false

latest

failure-strategy

当记录产生的消息未被确认(nack)时要应用的失败策略。值可以是 fail (默认), ignore ,或 dead-letter-queue

类型: string

false

commit-strategy

记录产生的消息被确认时的提交策略。值可以是 latest , ignorethrottled 。如果 enable.auto.commit 为真,则默认为 ignore ,否则为 throttled

类型: string

false

throttled.unprocessed-record-max-age.ms

在使用 throttled 提交策略时,指定未处理的消息在连接器被标记为不健康之前的可存续的最大时间(毫秒)。将此属性设置为0可以禁用该监控。

类型: int

false

60000

dead-letter-queue.topic

failure-strategy 被设置为 dead-letter-queue ,用来指明记录会发送到哪个主题的。默认值是 dead-letter-topic-$channel

类型: string

false

dead-letter-queue.key.serializer

failure-strategy 被设置为 dead-letter-queue 时,表示要使用的键值的序列化器。如果没有设置则使用与键值反序列化器相关的序列化器

Type: string

false

dead-letter-queue.value.serializer

failure-strategy 被设置为 dead-letter-queue ,表示要使用的值的序列化器。如果没有设置,则使用与值反序列化器相关的序列化器n

类型: string

false

partitions

并发消费的分区的数量。连接器会创建指定数量的Kafka消费者。它应该与目标主题的分区数量相匹配

类型: int

false

requests

When partitions is greater than 1, this attribute allows configuring how many records are requested by each consumer every time.

Type: int

false

consumer-rebalance-listener.name

@Identifier 中设置的实现了 io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener 的bean的名称。如果被设置,那么这个再均衡监听器就会应用到消费者上。

类型: string

false

key-deserialization-failure-handler

@Identifier 中设置的实现了 io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler 的bean的名称。如果被设置,那么在反序列化键时发生的反序列化失败将被委托给这个处理程序,它可以重试或提供一个回退值。

类型: string

false

value-deserialization-failure-handler

@Identifier 中设置的实现了 io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler 的bean的名称。如果被设置,那么在反序列化值时发生的反序列化失败将被委托给这个处理程序,它可以重试或提供一个回退值。

类型: string

false

fail-on-deserialization-failure

当没有设置反序列化失败处理程序而反序列化失败发生时,报告该失败并将应用程序标记为不健康。如果设置为 false 并且发生了反序列化失败,会发送一个 null

类型: boolean

false

graceful-shutdown

Whether a graceful shutdown should be attempted when the application terminates.

Type: boolean

false

poll-timeout

以毫秒为单位的轮询超时时间。当轮询记录时,轮询将在返回记录之前最多等待该时间段。默认是1000ms

类型: int

false

pause-if-no-requests

当应用程序不请求记录时,轮询是否必须被暂停,或应用程序开始请求记录时轮询是否要恢复。该属性允许实现基于应用容量的背压。需要注意轮询不会停止,但当暂停时不会获取任何记录。

类型: boolean

false

batch

是否启用Kafka记录的批处理。通道的注入点必须消费一个兼容的类型,例如 List<Payload>KafkaRecordBatch<Payload>.

类型: boolean

false

false

max-queue-size-factor

用于决定队列中待处理的记录的最大值的乘数因子,使用 max.poll.records * max-queue-size-factor 来决定。默认为2。在 batch 模式下 max.poll.records1

类型: int

false

client-id-prefix

Prefix for Kafka client client.id attribute. If defined configured or generated client.id will be prefixed with the given value, otherwise kafka-producer- is the prefix.

Type: string

false

buffer.memory

生产者可用于缓冲等待发送至服务器的记录的总字节数。

类型: long

false

33554432

close-timeout

等待Kafka生产者平滑关闭的毫秒数

类型: int

false

10000

cloud-events

启用(默认启用)或停用云事件支持。如果在 传入 通道上启用,连接器会分析传入记录并尝试创建云事件元数据。如果在 传出 通道上启用,并且如果消息包括云事件元数据,连接器会将传出的消息作为云事件发送。

类型: boolean

false

cloud-events-data-content-type

(cloud-events-default-data-content-type)

配置传出的云事件的默认 datacontenttype 属性。要求将 cloud-events 设为 true 。如果消息本身没有配置 datacontenttype 属性,则使用此值

类型: string

false

cloud-events-data-schema

(cloud-events-default-data-schema)

配置传出的云事件的默认 dataschema 属性。要求将 cloud-events 设为 true 。如果消息本身没有配置 dataschema 属性,则使用此值

类型: string

false

cloud-events-insert-timestamp

(cloud-events-default-timestamp)

连接器是否应将 time 属性自动插入到传出的云事件中。要求将 cloud-events 设为 true 。如果消息本身没有配置 time 属性,则使用此值

类型: boolean

false

cloud-events-mode

云事件模式( structuredbinary (默认))。表示如何在传出的记录中写入云事件

类型: string

false

binary

cloud-events-source

(cloud-events-default-source)

配置传出的云事件的默认 source 属性。要求将 cloud-events 设为 true 。如果消息本身没有配置 source 属性,则使用此值

类型: _string

false

cloud-events-subject

(cloud-events-default-subject)

配置传出的云事件的默认 subject 属性。要求将 cloud-events 设为 true 。如果消息本身没有配置 subject 属性,则使用此值

类型: string

false

cloud-events-type

(cloud-events-default-type)

配置传出的云事件的默认 type 属性。要求将 cloud-events 设为 true 。如果消息本身没有配置 type 属性,则使用此值

类型: string

false

health-enabled

健康报告是否被启用(默认启用)或禁用

类型: boolean

false

health-readiness-enabled

是否启用(默认启用)或禁用就绪情况报告

类型: boolean

false

health-readiness-timeout

已废弃 - 在就绪状态健康检查期间,连接器会连接到broker并获取主题列表。这个属性指定了获取操作的最大耗时(ms)。如果超时,通道会被认为是没有就绪的。已废弃:使用’health-topic-verification-timeout’代替。

类型: long

false

health-readiness-topic-verification

已废弃 - 就绪检查是否应该验证broker上是否存在主题。默认为false。启用它需要一个管理员连接。已废弃:请使用 'health-topic-verification-enabled' 代替。

类型: boolean

false

health-topic-verification-enabled

启动和就绪检查是否要验证broker上存在主题。默认为false。启用它需要一个管理员客户端连接。

类型: boolean

false

false

health-topic-verification-timeout

在启动和就绪的健康检查期间,连接器会连接到broker并获取主题列表。这个属性指定了获取的最大耗时(ms)。如果超时,通道就被认为是没有就绪的。

类型: long

false

kafka-configuration

为通道提供默认Kafka消费者/生产者配置的CDI bean的标识符。通道自身的配置仍可以覆盖任何属性。该bean必须有一个Map<String, Object>的类型,并且必须使用@io.smallrye.common.annotation.Identifier限定符来设置标识符。

类型: string

false

写入记录时使用的键值

类型: string

false

key-serialization-failure-handler

@Identifier 中设置的实现了 io.smallrye.reactive.messaging.kafka.SerializationFailureHandler 的bean的名称。如果被设置,那么在序列化键时发生的序列化失败将被委托给这个处理程序,它会重试或提供一个回退值。

类型: string

false

key.serializer

用来序列化记录的键的序列化器类名

类型: string

false

org.apache.kafka.common.serialization.StringSerializer

lazy-client

Whether Kafka client is created lazily or eagerly.

Type: boolean

false

false

max-inflight-messages

并发写入Kafka的消息的最大数量。它限制了等待被写入和被broker确认的消息的数量。你设置为: 0 以解除该限制

类型: long

false

merge

连接器是否应允许多个上游

类型: boolean

false

false

partition

目标分区的ID。设置为-1可以客户端自行确定分区

类型: int

false

propagate-headers

A comma-separating list of incoming record headers to be propagated to the outgoing record

Type: string

false

propagate-record-key

是否将传入的记录键添加到传出的记录中

类型: boolean

false

false

retries

如果设置为正数,连接器将尝试重新发送任何没有成功传递的记录(有可能是瞬时错误),直到达到重试的上限。如果设置为0,重试将被禁用。如果不设置,连接器会在 delivery.timeout.ms 配置的时间内,尝试重新发送任何未能交付的记录(由于潜在的瞬时错误)。

类型: long

false

2147483647

topic

被消费/生产消息的Kafka主题。如果这个属性和 topics 属性都没有设置,则使用通道名称。

类型: string

false

tracing-enabled

是否启用(默认启用)或禁用tracing

类型: boolean

false

value-serialization-failure-handler

@Identifier 中设置的实现了 io.smallrye.reactive.messaging.kafka.SerializationFailureHandler 的bean的名称。如果被设置,那么在序列化值时发生的序列化失败将被委托给这个处理程序,它会重试或提供一个回退值。

类型: string

false

value.serializer

用于序列化payload的序列化器的类名

类型: string

waitForWriteCompletion

在确认消息之前,客户端是否会等待Kafka确认写入的记录

类型: boolean

false

26.3. Kafka配置方案

Quarkus公开了所有与Kafka相关的应用属性,这些属性使用 default-kafka-broker 名称加 kafka.KAFKA_ 的前缀 。这个配置被用来建立与Kafka broker的连接。

除了这个默认配置外,您还可以使用 kafka-configuration 属性配置 Map 生产者的名称:

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.kafka-configuration=my-configuration

在这种情况下,连接器会查询与 my-configuration 名称相关的 Map 。如果没有设置 kafka-configuration ,就会进行额外的查询来寻找与 channel 名称相关的 Map (在前面的例子中是 my-channel )。

@Produces
@ApplicationScoped
@Identifier("my-configuration")
Map<String, Object> outgoing() {
    return Map.ofEntries(
            Map.entry("value.serializer", ObjectMapperSerializer.class.getName())

26.4. Conditionally configure channels

You can configure the channels using a specific profile. Thus, the channels are only configured (and added to the application) when the specified profile is enabled.

To achieve this, you need:

Prefix the mp.messaging.[incoming|outgoing].$channel entries with %my-profile such as %my-profile.mp.messaging.[incoming|outgoing].$channel.key=value

Use the @IfBuildProfile("my-profile") on the CDI beans containing @Incoming(channel) and @Outgoing(channel) annotations that need only to be enabled when the profile is enabled.

Note that reactive messaging verifies that the graph is complete. So, when using such a conditional configuration, ensure the application works with and without the profile enabled.

Note that this approach can also be used to change the channel configuration based on a profile.

import jakarta.ws.rs.Path; import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; @Path("/") public class ResourceSendingToKafka { @Channel("kafka") Emitter<String> emitter; (1) @POST @Produces(MediaType.TEXT_PLAIN) public CompletionStage<Void> send(String payload) { (2) return emitter.send(payload); (3)

节点会返回一个 CompletionStage ,表明该方法是异步的。 emitter.send 方法返回一个 CompletionStage<Void> 。当消息被写入Kafka时,返回的Future就被认为i完成了。如果写入失败,返回的 CompletionStage 会抛出异常。

如果节点没有返回 CompletionStage ,HTTP响应可能会返回在消息被发送到Kafka之前,因此失败不会被报告给用户。

如果您需要发送一条Kafka记录,请使用:

package org.acme;
import java.util.concurrent.CompletionStage;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import io.smallrye.reactive.messaging.kafka.Record;
@Path("/")
public class ResourceSendingToKafka {
    @Channel("kafka") Emitter<Record<String,String>> emitter;  (1)
    @POST
    @Produces(MediaType.TEXT_PLAIN)
    public CompletionStage<Void> send(String payload) {
        return emitter.send(Record.of("my-key", payload));    (2)
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import io.smallrye.common.annotation.Blocking;
@ApplicationScoped
public class FruitConsumer {
    @Incoming("fruits")                                     (1)
    @Transactional                                          (2)
    public void persistFruits(Fruit fruit) {                (3)
        fruit.persist();                                    (4)
由于我们是往数据库中写入数据,所以必须使用事务。这个注解启动了一个新的事务,并在方法返回时提交它。Quarkus会自动认为这个方法是 阻塞的 。事实上,使用常规的Hibernate方法向数据库写入是阻塞操作。所以,Quarkus会在一个可阻塞的工作线程中调用这个方法(而不是在I/O线程中)。
该方法接收每个Fruit对象。注意,您需要一个反序列化器来从Kafka记录中重建Fruit实例。
持久化接收到的 fruit 对象。
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);

相关的配置如下:

mp.messaging.incoming.fruits.connector=smallrye-kafka
mp.messaging.incoming.fruits.value.deserializer=org.acme.FruitDeserializer

请参看 [jackson序列化],了解更多关于Jackson与Kafka的使用细节。您也可以使用Avro。

27.3. 使用Hibernate Reactive持久化Kafka消息

为了将从Kafka收到的对象持久化到数据库中,您可以结合使用Hibernate Reactive与Panache。

让我们假设您收到了 Fruit 对象。为了简单起见,我们的 Fruit 类非常简单:

package org.acme;
import jakarta.persistence.Entity;
import io.quarkus.hibernate.reactive.panache.PanacheEntity;  (1)
@Entity
public class Fruit extends PanacheEntity {
    public String name;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.control.ActivateRequestContext;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class FruitStore {
    @Inject
    Mutiny.Session session;                    (1)
    @Incoming("in")
    @ActivateRequestContext (2)
    public Uni<Void> consume(Fruit entity) {
        return session.withTransaction(t -> {  (3)
            return entity.persistAndFlush()    (4)
                    .replaceWithVoid();        (5)
        }).onTermination().call(() -> session.close()); (6)
Hibernate Reactive Session and Panache APIs require an active CDI Request context.
@ActivateRequestContext annotation creates a new request context and destroys it when the Uni returned from the method completes.
If Panache is not used, Mutiny.SessionFactory can be injected and used similarly without the need of activating the request context or closing the session manually.
Requests a new transaction. The transaction completes when the passed action completes.
持久化该实体对象。它会返回一个 Uni<Fruit> 。
切换并返回 Uni<Void> 。
Close the session - this is close the connection with the database. The connection can then be recycled.

Unlike with classic Hibernate, you can’t use @Transactional. Instead, we use session.withTransaction and persist our entity. The map is used to return a Uni<Void> and not a Uni<Fruit>.

您需要一个能从记录中创建 Fruit 实例的反序列化器。可以使用Jackson的反序列化器来完成:

package org.acme;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);

相关的配置如下:

mp.messaging.incoming.fruits.connector=smallrye-kafka
mp.messaging.incoming.fruits.value.deserializer=org.acme.FruitDeserializer

请参看 [jackson序列化],了解更多关于Jackson与Kafka的使用细节。您也可以使用Avro。

27.4. 将Hibernate管理的实体写入Kafka中

让我们假设以下过:

因为我们要往数据库写入数据,所以我们必须在事务中运行该方法。然而,向Kafka发送消息是异步的。该操作完成后会返回一个 CompletionStage (如果您使用 MutinyEmitter,则返回 Uni)。我们必须确认在对象被写入之前事务仍然在运行。否则,您可能会在事务之外访问到该对象,而这是不允许的。

为了实现这一过程,您需要采取以下方法:

package org.acme;
import java.util.concurrent.CompletionStage;
import jakarta.transaction.Transactional;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
@Path("/")
public class ResourceSendingToKafka {
    @Channel("kafka") Emitter<Fruit> emitter;
    @POST
    @Path("/fruits")
    @Transactional                                                      (1)
    public CompletionStage<Void> storeAndSendToKafka(Fruit fruit) {     (2)
        fruit.persist();
        return emitter.send(new FruitDto(fruit));                       (3)
该方法接收要持久化的fruit实例。它返回了一个 CompletionStage ,用于事务分界。当返回的 CompletionStage 完成时,事务即会提交。在我们的例子中,这种情况就是消息被写入Kafka的时候。
Wrap the managed entity inside a Data transfer object and send it to Kafka.
This makes sure that managed entity is not impacted by the Kafka serialization.
import org.eclipse.microprofile.reactive.messaging.Channel;
import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MutinyEmitter;
@Path("/")
public class ReactiveGreetingResource {
    @Channel("kafka") MutinyEmitter<Fruit> emitter;     (1)
    @POST
    @Path("/fruits")
    public Uni<Void> sendToKafka(Fruit fruit) {         (2)
        return Panache.withTransaction(() ->            (3)
            fruit.<Fruit>persist()
            .chain(f -> emitter.send(f));               (4)
注入一个暴露了Mutiny API的 MutinyEmitter 。它简化了与Hibernate Reactive with Panache所暴露的Mutiny API的整合。
接收payload的HTTP方法返回一个 Uni<Void> 。当操作完成后,会返回HTTP响应(实体被持久化并被写入Kafka)。
我们需要在一个事务中把实体写进数据库。
一旦持久化操作完成,我们就把实体发送到Kafka。 send 方法会返回一个 Uni<Void> 。
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream() {
    return Multi.createBy().merging()
            .streams(
                    fruits.map(this::toJson),
                    emitAPeriodicPing()
Multi<String> emitAPeriodicPing() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(10))
            .onItem().transform(x -> "{}");
private String toJson(Fruit f) {
    try {
        return mapper.writeValueAsString(f);
    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);

该替代方法有点复杂,因为除了发送来自Kafka的fruit实例,我们还需要定期发送ping。为了实现这一点,我们合并了来自Kafka的数据流和一个每10秒发送一个 {} 的数据流。

27.7. Chaining Kafka Transactions with Hibernate Reactive transactions

By chaining a Kafka transaction with a Hibernate Reactive transaction you can send records to a Kafka transaction, perform database updates and commit the Kafka transaction only if the database transaction is successful.

The following example demonstrates:

import jakarta.ws.rs.core.MediaType; import org.eclipse.microprofile.faulttolerance.Bulkhead; import org.eclipse.microprofile.reactive.messaging.Channel; import org.hibernate.reactive.mutiny.Mutiny; import io.quarkus.hibernate.reactive.panache.Panache; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions; @Path("/") public class FruitProducer { @Channel("kafka") KafkaTransactions<Fruit> kafkaTx; (1) @POST @Path("/fruits") @Consumes(MediaType.APPLICATION_JSON) @Bulkhead(1) (2) public Uni<Void> post(Fruit fruit) { (3) return kafkaTx.withTransaction(emitter -> { (4) emitter.send(fruit); (5) return Panache.withTransaction(() -> { (6) return fruit.<Fruit>persist(); (7) }).replaceWithVoid(); Inject a KafkaTransactions which exposes a Mutiny API. It allows the integration with the Mutiny API exposed by Hibernate Reactive with Panache. Limit the concurrency of the HTTP endpoint to "1", preventing starting multiple transactions at a given time. The HTTP method receiving the payload returns a Uni<Void>. The HTTP response is written when the operation completes (the entity is persisted and Kafka transaction is committed). Begin a Kafka transaction. Send the payload to Kafka inside the Kafka transaction. Persist the entity into the database in a Hibernate Reactive transaction. Once the persist operation completes, and there is no errors, the Kafka transaction is committed. The result is omitted and returned as the HTTP response.

In the previous example the database transaction (inner) will commit followed by the Kafka transaction (outer). If you wish to commit the Kafka transaction first and the database transaction second, you need to nest them in the reverse order.

The next example demonstrates that using the Hibernate Reactive API (without Panache):

import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.hibernate.reactive.mutiny.Mutiny;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
@Path("/")
public class FruitProducer {
    @Channel("kafka") KafkaTransactions<Fruit> kafkaTx;
    @Inject Mutiny.SessionFactory sf; (1)
    @POST
    @Path("/fruits")
    @Consumes(MediaType.APPLICATION_JSON)
    @Bulkhead(1)
    public Uni<Void> post(Fruit fruit) {
        Context context = Vertx.currentContext(); (2)
        return sf.withTransaction(session -> (3)
                kafkaTx.withTransaction(emitter -> (4)
                        session.persist(fruit).invoke(() -> emitter.send(fruit)) (5)
                ).emitOn(context::runOnContext) (6)
quarkus.log.category."org.apache.kafka.clients".level=INFO
quarkus.log.category."org.apache.kafka.common.utils".level=INFO
quarkus.log.category."org.apache.kafka.common.metrics".level=INFO
kafka.bootstrap.servers=my-event-hub.servicebus.windows.net:9093 (1)
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ (2)
    username="$ConnectionString" \ (3)
    password="<YOUR.EVENTHUBS.CONNECTION.STRING>"; (4)
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="Endpoint=sb://my-event-hub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

这个配置可以是全局的(如上),也可以在 channel 配置中设置:

mp.messaging.incoming.$channel.bootstrap.servers=my-event-hub.servicebus.windows.net:9093
mp.messaging.incoming.$channel.security.protocol=SASL_SSL
mp.messaging.incoming.$channel.sasl.mechanism=PLAIN
mp.messaging.incoming.$channel.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="Endpoint=sb://my-event-hub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=...";

29.2. 红帽OpenShift Streams for Apache Kafka

红帽OpenShift Streams for Apache Kafka 提供了受管理的Kafka brokers。首先,按照 红帽OpenShift Streams for Apache Kafka的 rhoas 命令行入门 的说明,创建您的Kafka broker实例。请确保您复制了与您创建的 ServiceAccount 相关的客户ID和客户密码。

然后,您可以配置Quarkus应用程序以连接到broker,如下所示:

kafka.bootstrap.servers=<connection url> (1)
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="${KAFKA_USERNAME}" \ (2)
  password="${KAFKA_PASSWORD}"; (3)

Red Hat OpenShift Service Registry provides fully managed service registry for handling Kafka schemas.

You can follow the instructions from Getting started with Red Hat OpenShift Service Registry, or use the rhoas CLI to create a new service registry instance:

rhoas service-registry create --name my-schema-registry

Make sure to note the Registry URL of the instance created. For authentication, you can use the same ServiceAccount you created previously. You need to make sure that it has the necessary permissions to access the service registry.

For example, using the rhoas CLI, you can grant the MANAGER role to the service account:

rhoas service-registry role add --role manager --service-account [SERVICE_ACCOUNT_CLIENT_ID]

Then, you can configure the Quarkus application to connect to the schema registry as follows:

mp.messaging.connector.smallrye-kafka.apicurio.registry.url=${RHOAS_SERVICE_REGISTRY_URL} (1)
mp.messaging.connector.smallrye-kafka.apicurio.auth.service.token.endpoint=${RHOAS_OAUTH_TOKEN_ENDPOINT} (2)
mp.messaging.connector.smallrye-kafka.apicurio.auth.client.id=${RHOAS_CLIENT_ID} (3)
mp.messaging.connector.smallrye-kafka.apicurio.auth.client.secret=${RHOAS_CLIENT_ID} (4)
The service registry URL, given on the admin console, such as https://bu98.serviceregistry.rhcloud.com/t/0e95af2c-6e11-475e-82ee-f13bd782df24/apis/registry/v2 The OAuth token endpoint URL, such as https://identity.api.openshift.com/auth/realms/rhoas/protocol/openid-connect/token The client id (from the service account) The client secret (from the service account)

29.2.2. Binding Red Hat OpenShift managed services to Quarkus application using the Service Binding Operator

If your Quarkus application is deployed on a Kubernetes or OpenShift cluster with Service Binding Operator and OpenShift Application Services operators installed, configurations necessary to access Red Hat OpenShift Streams for Apache Kafka and Service Registry can be injected to the application using Kubernetes Service Binding.

In order to set up the Service Binding, you need first to connect OpenShift managed services to your cluster. For an OpenShift cluster you can follow the instructions from Connecting a Kafka and Service Registry instance to your OpenShift cluster.

Once you’ve connected your cluster with the RHOAS Kafka and Service Registry instances, make sure you’ve granted necessary permissions to the newly created service account.

Then, using the Kubernetes Service Binding extension, you can configure the Quarkus application to generate ServiceBinding resources for those services:

quarkus.kubernetes-service-binding.detect-binding-resources=true
quarkus.kubernetes-service-binding.services.kafka.api-version=rhoas.redhat.com/v1alpha1
quarkus.kubernetes-service-binding.services.kafka.kind=KafkaConnection
quarkus.kubernetes-service-binding.services.kafka.name=my-kafka
quarkus.kubernetes-service-binding.services.serviceregistry.api-version=rhoas.redhat.com/v1alpha1
quarkus.kubernetes-service-binding.services.serviceregistry.kind=ServiceRegistryConnection
quarkus.kubernetes-service-binding.services.serviceregistry.name=my-schema-registry

For this example Quarkus build will generate the following ServiceBinding resources:

apiVersion: binding.operators.coreos.com/v1alpha1
kind: ServiceBinding
metadata:
  name: my-app-kafka
spec:
  application:
    group: apps.openshift.io
    name: my-app
    version: v1
    kind: DeploymentConfig
  services:
    - group: rhoas.redhat.com
      version: v1alpha1
      kind: KafkaConnection
      name: my-kafka
  detectBindingResources: true
  bindAsFiles: true
apiVersion: binding.operators.coreos.com/v1alpha1
kind: ServiceBinding
metadata:
  name: my-app-serviceregistry
spec:
  application:
    group: apps.openshift.io
    name: my-app
    version: v1
    kind: DeploymentConfig
  services:
    - group: rhoas.redhat.com
      version: v1alpha1
      kind: ServiceRegistryConnection
      name: my-schema-registry
  detectBindingResources: true
  bindAsFiles: true

You can follow Deploying to OpenShift to deploy your application, including generated ServiceBinding resources. The configuration properties necessary to access the Kafka and Schema Registry instances will be injected to the application automatically at deployment.

This guide has shown how you can interact with Kafka using Quarkus. It utilizes Quarkus Messaging to build data streaming applications.

如果您想更进一步,请参看SmallRye Reactive Messaging,在Quarkus中使用的实现。

  • 4.2. 确认策略(Acknowledgment Strategies)
  • 4.3. 提交策略(Commit Strategies)
  • 4.4. 错误处理策略(Error Handling Strategies)
  • 4.5. 消费者组(Consumer Groups)
  • 4.6. 批量接收Kafka记录
  • 4.7. Stateful processing with Checkpointing
  • 5. 向Kafka发送消息
  • 5.1. Sending messages with Emitter
  • 5.2. 写确认
  • 5.3. 背压
  • 5.4. 重试消息的发送
  • 5.5. 处理序列化失败
  • 5.6. 内存 channels
  • 5.7. 对多个消费者广播信息
  • 5.8. Kafka事务处理
  • 6. Kafka Request-Reply
  • 7. 处理消息
  • 7.1. 传播记录键
  • 7.2. (Exactly-Once Processing)精确一次处理
  • 8. 直接访问Kafka客户端
  • 9. JSON序列化
  • 9.1. 通过Jackson进行序列化
  • 9.2. 通过JSON-B进行序列化
  • 10. Avro序列化
  • 11. JSON Schema Serialization
  • 12. 序列化/反串行器自动侦测
  • 13. JSON序列化器/反序列化器的生成
  • 14. 使用Schema注册表
  • 15. 健康检查
  • 15.1. Kafka Broker就绪检查(Readiness Check)
  • 15.2. Kafka响应式消息传递健康检查
  • 16. 可观察性
  • 16.1. Channel metrics
  • 17. Kafka流
  • 18. 使用Snappy进行消息压缩
  • 19. 用OAuth进行认证
  • 20. 测试一个Kafka应用程序
  • 20.1. 无broker的测试
  • 20.2. 使用Kafka broker的测试
  • 21. Kafka开发服务(Dev Services)
  • 21.1. 启用/禁用Kafka开发服务
  • 21.2. 共享的broker
  • 21.3. 设置端口
  • 21.4. 配置镜像
  • 21.5. 配置Kafka主题
  • 21.6. Transactional and Idempotent producers support
  • 22. Kafka Dev UI
  • 23. Kubernetes服务绑定
  • 24. 执行模型
  • 25. Channel Decorators
  • 26. 配置参考
  • 26.1. 入站 channel 配置(从Kafka轮询)
  • 26.2. 出站 channel 配置(写入Kafka)。
  • 26.3. Kafka配置方案
  • 26.4. Conditionally configure channels
  • 27. 与Kafka的整合—​通用模式
  • 27.1. 从HTTP节点写消息到Kafka
  • 27.2. 用Hibernate与Panache来持久化Kafka消息
  • 27.3. 使用Hibernate Reactive持久化Kafka消息
  • 27.4. 将Hibernate管理的实体写入Kafka中
  • 27.5. 将Hibernate Reactive管理的实体写入Kafka中
  • 27.6. 将Kafka topic作为服务器发送的事件流化
  • 27.7. Chaining Kafka Transactions with Hibernate Reactive transactions
  • 28. 日志
  • 29. 连接到受管理的Kafka集群
  • 29.1. Azure Event Hub
  • 29.2. 红帽OpenShift Streams for Apache Kafka
  • 30. 更进一步
  • Related content

    On the same extensions

  • Apicurio注册表的开发服务
  • Kafka开发服务(Dev Services)
  • Getting Started to Quarkus Messaging with Apache Kafka
  • Kafka Dev UI
  • Quarkus Virtual Thread support with Reactive Messaging
  • 使用Apache Kafka与模式仓库和Avro
  • Using Apache Kafka with Schema Registry and JSON Schema
  • On the same topics

  • Getting Started to Quarkus Messaging with Apache Kafka
  • Quarkus Virtual Thread support with Reactive Messaging
  • Apache Pulsar Reference Guide
  • Apicurio注册表的开发服务
  • Kafka开发服务(Dev Services)
  • Dev Services for Pulsar
  • Dev Services for RabbitMQ
  • Getting Started to Quarkus Messaging with Apache Pulsar
  • Getting Started to Quarkus Messaging with RabbitMQ
  • Kafka Dev UI
  • Quarkus Messaging Extensions
  • Reactive Messaging RabbitMQ Connector Reference Documentation
  • 使用Apache Kafka Streams
  • 使用Apache Kafka与模式仓库和Avro
  • Using Apache Kafka with Schema Registry and JSON Schema
  • AMQP的开发服务
  • Getting Started to Quarkus Messaging with AMQP 1.0
  • 响应式消息AMQP 1.0连接器参考文档
  • 使用JMS
  • 使用事件总线
  •