添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

Unable to send Kafka Avro Message to Message Channel <Failed to convert Generic Message to Outbound Message>

Ask Question

We are trying to push a Kafka notification to the external Kafka Topic by sending the Avro Schema Message to the Message Channel.

On sending the message to the channel, we are getting the below exception:

    Failed to send Message to channel 'DemoChannel'; 
        nested exception is java.lang.IllegalStateException: 
            Failed to convert message: 'GenericMessage [payload={"location":"US"}, headers={id=46cf666d-647f-36b7-f43c-259621b78842, contentType=avro/bytes, timestamp=1651571094238}]' to outbound message.
            org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'DemoChannel'; nested exception is java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={"location":"US"}, headers={id=46cf666d-647f-36b7-f43c-259621b78842, contentType=avro/bytes, timestamp=1651571094238}]
            at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:167)
            at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
            at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)

Below is the configuration for the Kafka Topic and Message channel in application.yml file of the Spring Boot service.

    cloud:
        stream:
          bindings:
            DemoChannel:
              destination: demoTest
              content-type: avro/bytes  
          kafka:
           binder:
            replication-factor: 1
            brokers: ${broker-ip-and-port}
            zkNodes: ${zookeeper-ip-and-port}
            autoCreateTopics: false
            zkConnectionTimeout: 36000

Below is the Message Channel class file:

             import org.springframework.cloud.stream.annotation.Output;
             import org.springframework.messaging.MessageChannel;
             public interface CustomDemoChannel {
                  @Output("DemoChannel")
                  MessageChannel customDemoChannel();

Below is the Producer code trying to send the Avro Message to the Message channel

             //initialized by the autowired CustomDemoChannel variable
             MessageChannel messageChannel ; 
             //DemoChannel is the Avro Generated class file based on the Avro schema file
             //avroSchemaObject is constructed and initialized by the inner Builder class of the Avro generated DemoChannel class
             DemoChannel avroSchemaObject; 
             //Message to be published is built with payload
             Message<DemoChannel> message = MessageBuilder.withPayload(avroSchemaObject).build();
             //Sending the message to the message channel
             messageChannel.send(message);

How can I resolve this exception?

Do you provide an AVRO message converter? There is no AVRO converters that come by default when seeing the content type to avro/bytes. You might want to look into that. If there is a simple app where we can reproduce the issue, we can suggest some alternatives. – sobychacko May 3, 2022 at 14:50

Thanks Soby!

The issue was resolved by making an additional change: i.e defining an AvroSchemaConverter and including this as a resource in the existing Kafka Producer class.

Step 1: Define your AvroSchemaConverter

@Configuration
public class AvroNoSchemaRegistryConfiguration {
    public static final String CONVERTER_DEMO = "DemoConverter";
    @Bean(name = CONVERTER_DEMO)
    public AvroSchemaMessageConverter demoConverter() {
        AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
        return converter;

Step 2: Use the AvroSchemaConverter as a resource in your producer class file like below:

@Resource(name = AvroNoSchemaRegistryConfiguration.CONVERTER_DEMO)
private AvroSchemaMessageConverter converterDemo;

The resource will be autowired by the Spring Cloud Stream jars and the message gets converted into the Avro format at runtime allowing the message to get published to the topic.

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.