开发spring-kafka

自定义 consumerFactory

@Configuration
public class KafkaConfig {
  @Autowired private KafkaProperties kafkaProperties;
  /** 名为test的topic 传输的是字符串. */
  public static final String TOPIC_TEST = "test";
  /** 名为model的topic 传输的是json. */
  public static final String TOPIC_MODEL = "model";
  /** 自定义接受json的listener. */
  public static final String KAFKA_JSON_LISTENER_CONTAINER_FACTORY =
      "kafkaJsonListenerContainerFactory";
  @Bean(name = KAFKA_JSON_LISTENER_CONTAINER_FACTORY)
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
      kafkaJsonListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(10);
    factory.setMessageConverter(new StringJsonMessageConverter());
    factory.getContainerProperties().setIdleEventInterval(60000L * 60);
    factory.getContainerProperties().setPollTimeout(10000);
    return factory;
  /** */
  private Map<String, Object> consumerProps() {
    Map<String, Object> props = new HashMap<>(20);
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
    /** groupId */
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId());
    /** 消费者是否自动提交偏移 量,默认值是 true */
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    /** 服务器从每个分区里返回给消费者的最大字节数 默认值是 1MB */
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "512000");
    /** 消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s */
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 35000);
    /** 指定 broker 的等待时间,默认是 500ms */
    props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 35000);
    /** 请求超时配置 */
    props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
    /** 多长时间自动提交一次 */
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
    /** 控制单次调用 call() 方法能够返回的记录数量 */
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return props;
  // 如果spring.kafka属性不足 则需要自定义ConsumerFactory
  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerProps());

配置文件如下

spring
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: sboot.kfk
      enable-auto-commit: true
      fetch-max-wait: 35000
      auto-commit-interval: 5000
      max-poll-records: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      concurrency: 4
      poll-timeout: 10000

结果启动报错

Description:
Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
The following candidates were found but could not be injected:
	- Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans of type 'org.springframework.kafka.core.ConsumerFactory' consumerFactory
Action:
Consider revisiting the entries above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.

说在配置中找不到ConsumerFactory,但是我实实在在的写了这个@Bean了。最后发现源码里面的ConsumerFactory是这样的。

	@Bean
	@ConditionalOnMissingBean(ConsumerFactory.class)
	public ConsumerFactory<?, ?> kafkaConsumerFactory() {
		return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());

仔细看,有什么不同?
ConsumerFactory<?, ?> 而不是ConsumerFactory<String, String>,泛型错误,spring的源码中应该有地方是把这个自定义的bean拿到,但是校验的时候发现泛型对不上,因此报错。以此类推比如要自定义RedisTemplate的时候也注意泛型 RedisTemplate<Object,Object>和RedisTemplate<String,Object>的区别,看看到底要求注入的是哪个。

结论:
注意自定义spring bean的时候看好泛型是否能对的上

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

  • 浏览量 2136
  • 收藏 0
  • 0

所有评论(0)