@KafkaListener 配置项
* @KafkaListener(groupId = "testGroup", topicPartitions = {
* @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
* @TopicPartition(topic = "topic2", partitions = "0",
* partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
* },concurrency = "6")
* //concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数
* 监听的 Topic 数组
* The topics for this listener.
* The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
* An expression must be resolved to the topic name.
* This uses group management and Kafka will assign partitions to group members.
* Mutually exclusive with {@link #topicPattern()} and {@link #topicPartitions()}.
* @return the topic names or expressions (SpEL) to listen to.
String[] topics() default {};
* 监听的 Topic 表达式
* The topic pattern for this listener. The entries can be 'topic pattern', a
* 'property-placeholder key' or an 'expression'. The framework will create a
* container that subscribes to all topics matching the specified pattern to get
* dynamically assigned partitions. The pattern matching will be performed
* periodically against topics existing at the time of check. An expression must
* be resolved to the topic pattern (String or Pattern result types are supported).
* This uses group management and Kafka will assign partitions to group members.
* Mutually exclusive with {@link #topics()} and {@link #topicPartitions()}.
* @return the topic pattern or expression (SpEL).
* @see org.apache.kafka.clients.CommonClientConfigs#METADATA_MAX_AGE_CONFIG
String topicPattern() default "";
* @TopicPartition 注解的数组。每个 @TopicPartition 注解,可配置监听的 Topic、队列、消费的开始位置
* The topicPartitions for this listener when using manual topic/partition
* assignment.
* Mutually exclusive with {@link #topicPattern()} and {@link #topics()}.
* @return the topic names or expressions (SpEL) to listen to.
TopicPartition[] topicPartitions() default {};
* 消费者分组
* Override the {@code group.id} property for the consumer factory with this value
* for this listener only.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return the group id.
* @since 1.3
String groupId() default "";
* 使用消费异常处理器 KafkaListenerErrorHandler 的 Bean 名字
* Set an {@link org.springframework.kafka.listener.KafkaListenerErrorHandler} bean
* name to invoke if the listener method throws an exception.
* @return the error handler.
* @since 1.3
String errorHandler() default "";
* 自定义消费者监听器的并发数,这个我们在 TODO 详细解析。
* Override the container factory's {@code concurrency} setting for this listener. May
* be a property placeholder or SpEL expression that evaluates to a {@link Number}, in
* which case {@link Number#intValue()} is used to obtain the value.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return the concurrency.
* @since 2.2
String concurrency() default "";
* 是否自动启动监听器。默认情况下,为 true 自动启动。
* Set to true or false, to override the default setting in the container factory. May
* be a property placeholder or SpEL expression that evaluates to a {@link Boolean} or
* a {@link String}, in which case the {@link Boolean#parseBoolean(String)} is used to
* obtain the value.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return true to auto start, false to not auto start.
* @since 2.2
String autoStartup() default "";
* Kafka Consumer 拓展属性。
* Kafka consumer properties; they will supersede any properties with the same name
* defined in the consumer factory (if the consumer factory supports property overrides).
* <h3>Supported Syntax</h3>
* <p>The supported syntax for key-value pairs is the same as the
* syntax defined for entries in a Java
* {@linkplain java.util.Properties#load(java.io.Reader) properties file}:
* <li>{@code key=value}</li>
* <li>{@code key:value}</li>
* <li>{@code key value}</li>
* </ul>
* {@code group.id} and {@code client.id} are ignored.
* @return the properties.
* @since 2.2.4
* @see org.apache.kafka.clients.consumer.ConsumerConfig
* @see #groupId()
* @see #clientIdPrefix()
String[] properties() default {};
* The unique identifier of the container managing for this endpoint.
* <p>If none is specified an auto-generated one is provided.
* <p>Note: When provided, this value will override the group id property
* in the consumer factory configuration, unless {@link #idIsGroup()}
* is set to false.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return the {@code id} for the container managing for this endpoint.
* @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
String id() default "";
* id 唯一标识的前缀
* When provided, overrides the client id property in the consumer factory
* configuration. A suffix ('-n') is added for each container instance to ensure
* uniqueness when concurrency is used.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return the client id prefix.
* @since 2.1.1
String clientIdPrefix() default "";
* 当 groupId 未设置时,是否使用 id 作为 groupId
* When {@link #groupId() groupId} is not provided, use the {@link #id() id} (if
* provided) as the {@code group.id} property for the consumer. Set to false, to use
* the {@code group.id} from the consumer factory.
* @return false to disable.
* @since 1.3
boolean idIsGroup() default true;
* 使用的 KafkaListenerContainerFactory Bean 的名字。
* 若未设置,则使用默认的 KafkaListenerContainerFactory Bean 。
* The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
* to use to create the message listener container responsible to serve this endpoint.
* <p>If not specified, the default container factory is used, if any.
* @return the container factory bean name.
String containerFactory() default "";
* 所属 MessageListenerContainer Bean 的名字。
* If provided, the listener container for this listener will be added to a bean
* with this value as its name, of type {@code Collection<MessageListenerContainer>}.
* This allows, for example, iteration over the collection to start/stop a subset
* of containers.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return the bean name for the group.
String containerGroup() default "";
* 真实监听容器的 Bean 名字,需要在名字前加 "__" 。
* A pseudo bean name used in SpEL expressions within this annotation to reference
* the current bean within which this listener is defined. This allows access to
* properties and methods within the enclosing bean.
* Default '__listener'.
* Example: {@code topics = "#{__listener.topicList}"}.
* @return the pseudo bean name.
* @since 2.1.2
String beanRef() default "__listener";
Java Concurrency
ConcurrencyThe issue that can make a difference is blocking. If one task in your program is unable to continuebecause of some condition outside of the control of the program(typically I/O), we say that the taskor the thread blocks. Without concurrency, the whole program comes to a stop until the ext Read More