In an enterprise level, it’s obvious for applications to be based on messaging for communication. This is done using a middleware between these applications as a Message Bus that enables them to work together.

Messaging Bus Architecture

One of the most used Messaging solutions is Apache Kafka : Kafka is an open-source stream-processing software platform developed by LinkedIn and donated to the Apache Software Foundation , written in Scala and Java . The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies. Kafka Clusters can be deployed in bare metal or in the Cloud .

Microsoft provides a great Azure Service for Kafka customers: Azure Event Hubs 🤩

Azure Event Hubs is a fully managed, real-time data ingestion service that’s simple, trusted, and scalable. It provides streaming millions of events per second from any source to build dynamic data pipelines and immediately respond to business challenges. Azure Event Hubs keeps processing data during emergencies using the geo-disaster recovery and geo-replication features.

Azure Event Hubs allows existing Apache Kafka clients and applications to talk to Event Hubs without any code changes—you get a managed Kafka experience without having to manage your own clusters.

In this tutorial, I will try to make two small Spring Boot applications that will communicate thru the Azure Event Hubs .

The source code of the sample application that we will be developing in this post is available on Github .

Creating an azure event hubs namespace

First of all, we need to start by creating the Event Hubs Namespace :

  • Name will be used for creating the Event Hubs Namespace URL
  • Pricing Tier : Standard - The Kafka support is enabled for the Standard and Dedicated pricing tiers only .

Creating the Azure Event Hubs

Authorizing the access to the azure event hubs namespace

Each Event Hubs namespace and each Event Hubs entity (an event hub instance or a Kafka topic) has a shared access authorization policy made up of rules. The policy at the namespace level applies to all entities inside the namespace, irrespective of their individual policy configuration. For each authorization policy rule, you decide on three pieces of information: name, scope, and rights. The name is a unique name in that scope. The scope is the URI of the resource in question. For an Event Hubs namespace , the scope is the fully qualified domain name (FQDN), such as https://<yournamespace>.servicebus.windows.net/ .

The rights provided by the policy rule can be a combination of:

  • Send – Gives the right to send messages to the entity
  • Listen – Gives the right to listen or receive to the entity
  • Manage – Gives the right to manage the topology of the namespace, including creation and deletion of entities

In our case we need to create a Shared Access Authorization Policy that can be Sending and Listening to our Event Hubs namespace :

Creating the Azure Event Hubs Shared Access Policy

Creating an azure event hub

Next, in my nebrass namespace , I will create a new Event Hub called topic-exchange 🥳

Creating an Azure Event Hub

generating our sample application project

As usual, we will generate our project using the Spring Initializr . Our application will have 3 dependencies:

  • Web
  • Kafka
  • Lombok

Generating the project skull on Spring Initializr

As you see here, there is no specific Azure dependency or specific library.

In our example, we will have an application that is producing/consuming Kafka messages. Generally, in many tutorials you will find separated producer and consumer applications. But in our application, I wanted to have to have both features in the same application, which is a real world use case. ⚠️ It’s not mandatory that the producer application is not listening to the same source 😅

Now we will be creating our Kafka application 😁

Let’s start by renaming the application.properties to application.yaml - I like to use YAML 😁 and we will insert the first value inside, which is the name of the Topic we want to write to:

1
topic:
  name: exchange-topic

This topic name can be injected into the code using :

1
@Value("${topic.name}")
private String topicName;

Now, let’s move to defining the structure of our Kafka message, that we will be producing and consuming - our Message will be a small POJO with only one String body attribute :

1
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class SimpleMessage {
    private String body;

With the message, we need to add a JSON Serializer, which obviously 🤣 will be used to Serialize our message to JSON 🥶

1
public class ProducerMessageSerializer extends JsonSerializer<SimpleMessage> {

Next, we can now create our Kafka producer, which will be a Spring Service:

 1
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducer {
    private final KafkaTemplate<String, SimpleMessage> kafkaTemplate;
    @Value("${topic.name}")
    private String topicName;
    public void send(SimpleMessage message) {
        this.kafkaTemplate.send(topicName, message);
        log.info("Published the message [{}] to the kafka queue: [{}]",
                message.getBody(),
                topicName

But where the data will be sent ? 😱 We need to go back to the application.yaml file to add some (many) configuration details: 🤓

 1
spring:
  kafka:
    bootstrap-servers: nebrass.servicebus.windows.net:9093
    client-id: first-service
    properties:
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://nebrass.servicebus.windows.net/;SharedAccessKeyName=SendReceiveOnly;SharedAccessKey=XXXX";
      sasl.mechanism: PLAIN
      security.protocol: SASL_SSL
    producer:
      value-serializer: com.targa.labs.dev.kafkaonazure.ProducerMessageSerializer
topic:
  name: exchange-topic

Wow 😱 There are many items added to the list. Keep calm, it’s very simple configuration:

  • spring.kafka.bootstrap-servers : Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster 👉 This value is extracted by the Event Hubs name, in my case it’s: nebrass.servicebus.windows.net:9093
  • spring.kafka.client-id : ID to pass to the server when making requests. Used for server-side logging 👉 first-service
  • spring.kafka.properties.sasl.mechanism : PLAIN 👉 PLAIN (also known as SASL/PLAIN ) is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication.
  • spring.kafka.properties.security.protocol : SASL_SSL 👉 this property ensures that all broker/client communication is encrypted and authenticated using SASL/PLAIN
  • spring.kafka.properties.sasl.jaas.config : Configure the JAAS configuration property to describe how the clients like producer and consumer can connect to the Kafka Brokers. The properties username and password are used by clients to configure the user for client connections. In our example, clients connect to the broker as the username is “$ConnectionString” and the password will be our Azure EventHubs ConnectionString which is in our case the connection string of our Shared Access Authorization Policy . We will be using org.apache.kafka.common.security.plain.PlainLoginModule as the login module implementation which should provide username as the public credential and password as the private credential 👉 so our property value will be: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password=“Endpoint=sb://nebrass.servicebus.windows.net/;SharedAccessKeyName=SendReceiveOnly;SharedAccessKey=XXXX”;
  • spring.kafka.producer.value-serializer : Serializer class for values 👉 com.targa.labs.dev.kafkaonazure.ProducerMessageSerializer

Expose the kafka producer

We will be inserting Kafka messages with a content received from a REST API . So for this, we will create a new RestController :

 1
@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
public class KafkaSender {
    private final KafkaProducer kafkaProducer;
    @PostMapping("send")
    public void sendData(@RequestBody SimpleMessage message) {
        this.kafkaProducer.send(message);

We can send some messages using Postman/Insomnia or even via command line, for example, using cURL:

1
curl -d '{"body": "Hello there !"}' -H "Content-Type: application/json" -X POST http://localhost:8080/api/send

Creating the Kafka consumer

The same way we did the Kafka Producer, our Kafka Consumer will be a classic Spring Service with a method annotated with @KafkaListener on our Event Hub:

1
@Service
@Slf4j
public class KafkaConsumer {
    @KafkaListener(topics = "${topic.name}")
    public void receive(SimpleMessage consumerMessage) {
        log.info("Received message from kafka queue: {}", consumerMessage.getBody());

So simple ! 😁 but this will not be working, unless we add the Spring Kafka Consumer configuration in our application.yaml file:

1
spring:
  kafka:
    consumer:
      group-id: $Default
      properties:
        spring.json: