In an enterprise level, it’s obvious for applications to be based on messaging for communication. This is done using a middleware between these applications as a Message Bus that enables them to work together.
One of the most used Messaging solutions is Apache Kafka : Kafka is an open-source stream-processing software platform developed by LinkedIn and donated to the Apache Software Foundation , written in Scala and Java . The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies. Kafka Clusters can be deployed in bare metal or in the Cloud .
Microsoft provides a great Azure Service for Kafka customers: Azure Event Hubs 🤩
Azure Event Hubs is a fully managed, real-time data ingestion service that’s simple, trusted, and scalable. It provides streaming millions of events per second from any source to build dynamic data pipelines and immediately respond to business challenges. Azure Event Hubs keeps processing data during emergencies using the geo-disaster recovery and geo-replication features.
Azure Event Hubs allows existing Apache Kafka clients and applications to talk to Event Hubs without any code changes—you get a managed Kafka experience without having to manage your own clusters.
In this tutorial, I will try to make two small Spring Boot applications that will communicate thru the Azure Event Hubs .
The source code of the sample application that we will be developing in this post is available on Github .
Creating an azure event hubs namespace
First of all, we need to start by creating the Event Hubs Namespace :
- Name will be used for creating the Event Hubs Namespace URL
- Pricing Tier : Standard - The Kafka support is enabled for the Standard and Dedicated pricing tiers only .
Authorizing the access to the azure event hubs namespace
Each
Event Hubs namespace
and each Event Hubs entity (an event hub instance or a Kafka topic) has a shared access authorization policy made up of rules. The policy at the namespace level applies to all entities inside the namespace, irrespective of their individual policy configuration. For each authorization policy rule, you decide on three pieces of information: name, scope, and rights. The name is a unique name in that scope. The scope is the URI of the resource in question. For an
Event Hubs namespace
, the scope is the fully qualified domain name (FQDN), such as
https://<yournamespace>.servicebus.windows.net/
.
The rights provided by the policy rule can be a combination of:
- Send – Gives the right to send messages to the entity
- Listen – Gives the right to listen or receive to the entity
- Manage – Gives the right to manage the topology of the namespace, including creation and deletion of entities
In our case we need to create a Shared Access Authorization Policy that can be Sending and Listening to our Event Hubs namespace :
Creating an azure event hub
Next, in my nebrass namespace , I will create a new Event Hub called topic-exchange 🥳
generating our sample application project
As usual, we will generate our project using the Spring Initializr . Our application will have 3 dependencies:
- Web
- Kafka
- Lombok
As you see here, there is no specific Azure dependency or specific library.
In our example, we will have an application that is producing/consuming Kafka messages. Generally, in many tutorials you will find separated producer and consumer applications. But in our application, I wanted to have to have both features in the same application, which is a real world use case. ⚠️ It’s not mandatory that the producer application is not listening to the same source 😅
Now we will be creating our Kafka application 😁
Let’s start by renaming the application.properties to application.yaml - I like to use YAML 😁 and we will insert the first value inside, which is the name of the Topic we want to write to:
|
|
This topic name can be injected into the code using :
|
|
Now, let’s move to defining the structure of our Kafka message, that we will be producing and consuming - our Message will be a small POJO with only one String body attribute :
|
|
With the message, we need to add a JSON Serializer, which obviously 🤣 will be used to Serialize our message to JSON 🥶
|
|
Next, we can now create our Kafka producer, which will be a Spring Service:
|
|
But where the data will be sent ? 😱 We need to go back to the
application.yaml
file to add some (many) configuration details: 🤓
|
|
Wow 😱 There are many items added to the list. Keep calm, it’s very simple configuration:
- spring.kafka.bootstrap-servers : Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster 👉 This value is extracted by the Event Hubs name, in my case it’s: nebrass.servicebus.windows.net:9093
- spring.kafka.client-id : ID to pass to the server when making requests. Used for server-side logging 👉 first-service
- spring.kafka.properties.sasl.mechanism : PLAIN 👉 PLAIN (also known as SASL/PLAIN ) is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication.
- spring.kafka.properties.security.protocol : SASL_SSL 👉 this property ensures that all broker/client communication is encrypted and authenticated using SASL/PLAIN
- spring.kafka.properties.sasl.jaas.config : Configure the JAAS configuration property to describe how the clients like producer and consumer can connect to the Kafka Brokers. The properties username and password are used by clients to configure the user for client connections. In our example, clients connect to the broker as the username is “$ConnectionString” and the password will be our Azure EventHubs ConnectionString which is in our case the connection string of our Shared Access Authorization Policy . We will be using org.apache.kafka.common.security.plain.PlainLoginModule as the login module implementation which should provide username as the public credential and password as the private credential 👉 so our property value will be: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password=“Endpoint=sb://nebrass.servicebus.windows.net/;SharedAccessKeyName=SendReceiveOnly;SharedAccessKey=XXXX”;
- spring.kafka.producer.value-serializer : Serializer class for values 👉 com.targa.labs.dev.kafkaonazure.ProducerMessageSerializer
Expose the kafka producer
We will be inserting Kafka messages with a content received from a REST API . So for this, we will create a new RestController :
|
|
We can send some messages using Postman/Insomnia or even via command line, for example, using cURL:
|
|
Creating the Kafka consumer
The same way we did the Kafka Producer, our Kafka Consumer will be a classic Spring Service with a method annotated with @KafkaListener on our Event Hub:
|
|
So simple ! 😁 but this will not be working, unless we add the
Spring Kafka Consumer
configuration in our
application.yaml
file:
|
|