Consider revisiting the entries above or defining a bean of type in your configuration.
开发spring-kafka自定义 consumerFactory@Configurationpublic class KafkaConfig {@Autowired private KafkaProperties kafkaProperties;/** 名为test的topic 传输的是字符串. */public static final String TOPIC_TEST = "test";/
开发spring-kafka
自定义 consumerFactory
@Configuration
public class KafkaConfig {
@Autowired private KafkaProperties kafkaProperties;
/** 名为test的topic 传输的是字符串. */
public static final String TOPIC_TEST = "test";
/** 名为model的topic 传输的是json. */
public static final String TOPIC_MODEL = "model";
/** 自定义接受json的listener. */
public static final String KAFKA_JSON_LISTENER_CONTAINER_FACTORY =
"kafkaJsonListenerContainerFactory";
@Bean(name = KAFKA_JSON_LISTENER_CONTAINER_FACTORY)
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaJsonListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(10);
factory.setMessageConverter(new StringJsonMessageConverter());
factory.getContainerProperties().setIdleEventInterval(60000L * 60);
factory.getContainerProperties().setPollTimeout(10000);
return factory;
/** */
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>(20);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
/** groupId */
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId());
/** 消费者是否自动提交偏移 量,默认值是 true */
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
/** 服务器从每个分区里返回给消费者的最大字节数 默认值是 1MB */
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "512000");
/** 消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s */
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 35000);
/** 指定 broker 的等待时间,默认是 500ms */
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 35000);
/** 请求超时配置 */
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
/** 多长时间自动提交一次 */
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
/** 控制单次调用 call() 方法能够返回的记录数量 */
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
// 如果spring.kafka属性不足 则需要自定义ConsumerFactory
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
配置文件如下
spring
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: sboot.kfk
enable-auto-commit: true
fetch-max-wait: 35000
auto-commit-interval: 5000
max-poll-records: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
concurrency: 4
poll-timeout: 10000
结果启动报错
Description:
Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
The following candidates were found but could not be injected:
- Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans of type 'org.springframework.kafka.core.ConsumerFactory' consumerFactory
Action:
Consider revisiting the entries above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.
说在配置中找不到ConsumerFactory,但是我实实在在的写了这个@Bean了。最后发现源码里面的ConsumerFactory是这样的。
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
仔细看,有什么不同?
ConsumerFactory<?, ?> 而不是ConsumerFactory<String, String>,泛型错误,spring的源码中应该有地方是把这个自定义的bean拿到,但是校验的时候发现泛型对不上,因此报错。以此类推比如要自定义RedisTemplate的时候也注意泛型 RedisTemplate<Object,Object>和RedisTemplate<String,Object>的区别,看看到底要求注入的是哪个。
结论:
注意自定义spring bean的时候看好泛型是否能对的上
所有评论(0)