In this post I will show you how you can build subscriptions between your OpenFaaS functions and your Apache Kafka topics. I’ll be using Kubernetes to show you around, but the connector-sdk works with any OpenFaaS provider.
OpenFaaS is solving real problems for our
end-user community
, many of whom are now relying on the project to in production and for core services. The OpenFaaS Pro kafka-connector was created to help commercial users integrate functions into their existing systems.
OpenFaaS functions and microservices are accessible over HTTP endpoints via the Gateway service, but let’s explore how other events can be used to trigger our functions.
This tutorial describes the Kafka connector which is part of the OpenFaaS Pro bundle.
Find out more
Apache Kafka
According to
Datadog
:
Apache Kafka
is an open-source stream-processing software platform. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. Its storage layer is essentially a “massively scalable pub/sub message queue designed as a distributed transaction log”.
At a high level the important concepts are Producers, Consumers and Brokers which communicate with each-other via messages attached to Topics.
The way it works is that the Producer is sending messages on specific Topics to the Broker, then the Consumers poll all the topics of interest from the Brokers. This approach is popular in distributed systems because it decouples direct communication between services and allows messages to be replayed or redelivered when required from a persistent log.
See also:
Apache Kafka Documentation
.
Kafka Connector
The
kafka-connector
is designed to connect Kafka topics to OpenFaaS Functions. After deploying the kafka-connector and pointing it at your broker, you can connect functions to topics by adding a simple annotation via your functions’ stack.yml file.
Conceptual architecture diagram:
The connector makes use of the connector-sdk, a Golang library which periodically queries the Gateway’s list of functions and then builds a map between each function and a topic. This map is then used to invoke functions when messages arrive on a given topic.
Each OpenFaaS connector that the community develops can take advantage of this shared functionality and its only responsibility is to read from a data source or queue and forward the message on.
See also:
Triggers & Events in OpenFaaS
Pre-requisites
For this tutorial we will use Kubernetes. I have chosen minikube for my own local cluster, but alternatives are available.
Install OpenFaaS using the
getting started guide
.
Install the
faas-cli
Set your
OPENFAAS_URL
environmental variable so that it points at your gateway.
Deploy Apache Kafka
A development version of Apache Kafka has been made available so that you can get this tutorial up and running in a few minutes. You can also customise the connector to use your existing deployment of Kafka or to use a managed provider.
For a self-hosted version, you can use Confluent’s chart:
arkade install kafka
Deploy the connector with helm
Create the required secret with your OpenFaaS Pro license code:
$ kubectl create secret generic \
-n openfaas \
openfaas-license \
--from-file license=$HOME/.openfaas/LICENSE
Add the OpenFaaS charts repository:
$ helm repo add openfaas https://openfaas.github.io/faas-netes/
$ helm repo update
Install the Kafka Connector with default values:
$ export BROKER_HOST=cp-helm-charts-cp-kafka-headless.default:9092
$ helm upgrade kafka-connector openfaas/kafka-connector \
--install \
--namespace openfaas \
--set topics="payment-received" \
--set brokerHost="$BROKER_HOST" \
--set printResponse="true" \
--set printResponseBody="true"
Set the topics
to the topics you want to subscribe to as a comma separated list without separating spaces
If you deployed Kafka to a remote location or a different namespace or port then just update the brokerHost
value.
We have now deployed the following components:
Zookeeper
Broker Host with Producer
kafka-connector
Subscribe to a topic
In order to consume topics via the connector we need to apply an annotation with a key of topic
, for example: topic: payment-received
. This should match at least one of the topics we defined in the earlier step for the connector.
Create a Docker Hub account if you don’t already have one and sign in with docker login
.
Create a new function in Go
# Replace with your container registry or Docker Hub account:
$ export OPENFAAS_PREFIX=docker.io/alexellis2
# Create a function in Go
$ faas-cli new email-receipt \
--lang=go
$ mv email-receipt.yml stack.yml
We also renamed the function’s YAML file to stack.yml
(the default)
The function is a simple Hello World
written in Go, you can edit it if you want, but for simplicity in our example we will keep the default message.
Edit the stack.yml
file by adding topic
annotation with the value which which we pointed in the Kafka Connector payment-received
. The file should look like this:
provider:
name: openfaas
gateway: http://127.0.0.1:8080
functions:
email-receipt:
lang: go
handler: ./email-receipt
# ...
annotations:
topic: payment-received
Edit the handler.go file:
package function
import (
"fmt"
// Handle a serverless request
func Handle(req []byte) string {
return fmt.Sprintf("Email customer in response to event: %s", string(req))
Build, Push and Deploy the function with single command:
$ faas-cli up
The kafka-connector will now rebuild its topic map and detect that the “email-receipt” function wants to be invoked with messages published on the payment-received
topic.
You can see the response of the function in two places: the function’s logs and in the connector’s logs. This is configurable in the helm chart.
Produce messages on the topic
Lets proceed by opening two terminals, one will be used to check the output of the function and the other will create messages on the topic.
In the first terminal follow the Kafka Connector’s logs:
$ kubectl logs deploy/kafka-broker -n openfaas \
--tail 100 \
--follow
In the second terminal, deploy a client for Kafka to produce messages:
$ kubectl apply -f - <<EOF
apiVersion: v1
kind: Pod
metadata:
name: kafka-client
namespace: default
spec:
containers:
- name: kafka-client
image: confluentinc/cp-enterprise-kafka:6.1.0
command:
- "exec tail -f /dev/null"
pod/kafka-client created
Connect to the pod and produce messages on the topic:
# Connect to a shell within the pod:
kubectl exec -it kafka-client -- /bin/bash
# Create the topic
kafka-topics \
--zookeeper cp-helm-charts-cp-zookeeper-headless:2181 \
--topic payment-received \
--create --partitions 1 \
--replication-factor 1 --if-not-exists
# Create a message
MESSAGE="`date -u`"
# Produce a test message to the topic
echo "$MESSAGE" | kafka-console-producer --broker-list \
cp-helm-charts-cp-kafka-headless:9092 \
--topic payment-received
You’ll now see your messages being sent to the payment-received topic and then the function being invoked.
OpenFaaS kafka-connector PRO Version: 0.6.0-rc3 Commit: 3784d5f35d1b6e090e37f211d6af1e51136ff9d6
2021/12/15 09:56:34 Licensed to: alex <[email protected]>, expires: 77 day(s)
2021/12/15 09:56:34 Broker: cp-helm-charts-cp-kafka-headless.default:9092 Topic: [payment-received]
Gateway: http://gateway.openfaas:8080
Rebuild interval: 30.000000s
Use TLS: false
Use SASL: false
2021/12/15 09:56:34 Binding to topics: [payment-received]
2019/04/09 18:37:10 Syncing topic map
2019/04/09 18:37:12 Invoke function: email-receipt
[#4] Received on [payment-received,0]: 'Kafka and go'
[200] payment-received => email-receipt
Hello, Go. You said: Kafka and go
2019/04/09 18:37:13 Syncing topic map
2019/04/09 18:37:16 Syncing topic map
Dealing with high-load / long-running functions
If you’re either dealing with very high-load or long-running functions, there is a way to release pressure and defer the executions of your functions. This uses OpenFaaS’ built-in asynchronous invocation mode.
Just install the chart again, but this time add:
--set asyncInvocation=true
In this mode, work is consumed immediately from Kafka, and then buffered in the built-in NATS queue in OpenFaaS, to be executed by the queue-worker.
See the logs of the queue worker as it executes your requests:
kubectl logs -n openfaas deploy/queue-worker
Alternatively, you can update the upstreamTimeout
value to something longer than the default and keep the existing behaviour:
--set upstreamTimeout=1m
Wrapping up
There are multiple real world examples where an event-driven approach with Apache Kafka can help integrate your existing tools and applications, and to extend functionality of existing systems without risking any regression.
Let me give you an example. Suppose that whenever a new customers signs-up and creates an account we may need to process their information in various ways in addition to just storing it in a SQL table.
We could publish their data on a topic such as: customer-signup
. With an event-driven approach using OpenFaaS and the kafka-connector we can now broadcast a message on the customer-signup
topic and then process it in a number of ways by using different functions. I.e. to check their credit score, update a lead in SalesForce and or even schedule a welcome pack in the post. So at any time you can extend the workflow of tasks for a customer-signup
message by just defining a new function and giving it an annotation of topic: customer-signup
.
Now that I’ve shown you how to connect to Kafka and explored a real-world use-case, it’s over to you to try it.
You can use your existing OpenFaaS Pro license, or apply for a 14-day trial: Find out more
If you would like to remove the Kafka-connector, use helm to delete it:
$ helm delete --purge kafka-connector
Going further
The kafka-connector implements the Connector SDK, checkout the SDK written in Go for how you can start connecting your own events and triggers.
Other examples include the Cron Connector and the vCenter Connector. You can view the other triggers here.
Editor(s): Alex Ellis
Learn use-cases for open-source serverless functions and how to integrate them into your existing workflows.
Through the hands-on exercises you will build your own JavaScript functions using Node.js. You'll add custom npm modules, integrate with other HTTP APIs, enable authentication and monitor the endpoints with Grafana.
Find out more on Gumroad