添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

In this post I will show you how you can build subscriptions between your OpenFaaS functions and your Apache Kafka topics. I’ll be using Kubernetes to show you around, but the connector-sdk works with any OpenFaaS provider.

OpenFaaS is solving real problems for our end-user community , many of whom are now relying on the project to in production and for core services. The OpenFaaS Pro kafka-connector was created to help commercial users integrate functions into their existing systems.

OpenFaaS functions and microservices are accessible over HTTP endpoints via the Gateway service, but let’s explore how other events can be used to trigger our functions.

This tutorial describes the Kafka connector which is part of the OpenFaaS Pro bundle. Find out more

Apache Kafka

According to Datadog :

Apache Kafka is an open-source stream-processing software platform. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. Its storage layer is essentially a “massively scalable pub/sub message queue designed as a distributed transaction log”.

At a high level the important concepts are Producers, Consumers and Brokers which communicate with each-other via messages attached to Topics.

The way it works is that the Producer is sending messages on specific Topics to the Broker, then the Consumers poll all the topics of interest from the Brokers. This approach is popular in distributed systems because it decouples direct communication between services and allows messages to be replayed or redelivered when required from a persistent log.

See also: Apache Kafka Documentation .

Kafka Connector

The kafka-connector is designed to connect Kafka topics to OpenFaaS Functions. After deploying the kafka-connector and pointing it at your broker, you can connect functions to topics by adding a simple annotation via your functions’ stack.yml file.

Conceptual architecture diagram:

The connector makes use of the connector-sdk, a Golang library which periodically queries the Gateway’s list of functions and then builds a map between each function and a topic. This map is then used to invoke functions when messages arrive on a given topic.

Each OpenFaaS connector that the community develops can take advantage of this shared functionality and its only responsibility is to read from a data source or queue and forward the message on.

See also: Triggers & Events in OpenFaaS

Pre-requisites

For this tutorial we will use Kubernetes. I have chosen minikube for my own local cluster, but alternatives are available.

Install OpenFaaS using the getting started guide .

Install the faas-cli

Set your OPENFAAS_URL environmental variable so that it points at your gateway.

Deploy Apache Kafka

A development version of Apache Kafka has been made available so that you can get this tutorial up and running in a few minutes. You can also customise the connector to use your existing deployment of Kafka or to use a managed provider.

For a self-hosted version, you can use Confluent’s chart:

arkade install kafka

Deploy the connector with helm

Create the required secret with your OpenFaaS Pro license code:

$ kubectl create secret generic \
    -n openfaas \
    openfaas-license \
    --from-file license=$HOME/.openfaas/LICENSE

Add the OpenFaaS charts repository:

$ helm repo add openfaas https://openfaas.github.io/faas-netes/
$ helm repo update

Install the Kafka Connector with default values:

$ export BROKER_HOST=cp-helm-charts-cp-kafka-headless.default:9092
$ helm upgrade kafka-connector openfaas/kafka-connector \
    --install \
    --namespace openfaas \
    --set topics="payment-received" \
    --set brokerHost="$BROKER_HOST" \
    --set printResponse="true" \
    --set printResponseBody="true"
    

Set the topics to the topics you want to subscribe to as a comma separated list without separating spaces

If you deployed Kafka to a remote location or a different namespace or port then just update the brokerHost value.

We have now deployed the following components:

  • Zookeeper
  • Broker Host with Producer
  • kafka-connector
  • Subscribe to a topic

    In order to consume topics via the connector we need to apply an annotation with a key of topic, for example: topic: payment-received. This should match at least one of the topics we defined in the earlier step for the connector.

    Create a Docker Hub account if you don’t already have one and sign in with docker login.

    Create a new function in Go

    # Replace with your container registry or Docker Hub account:
    $ export OPENFAAS_PREFIX=docker.io/alexellis2
    # Create a function in Go
    $ faas-cli new email-receipt \
      --lang=go
    $ mv email-receipt.yml stack.yml
      

    We also renamed the function’s YAML file to stack.yml (the default)

    The function is a simple Hello World written in Go, you can edit it if you want, but for simplicity in our example we will keep the default message.

    Edit the stack.yml file by adding topic annotation with the value which which we pointed in the Kafka Connector payment-received. The file should look like this:

    provider:
      name: openfaas
      gateway: http://127.0.0.1:8080
    functions:
      email-receipt:
        lang: go
        handler: ./email-receipt
    # ...
        annotations:
          topic: payment-received
    

    Edit the handler.go file:

    package function
    import (
    	"fmt"
    // Handle a serverless request
    func Handle(req []byte) string {
    	return fmt.Sprintf("Email customer in response to event: %s", string(req))
    

    Build, Push and Deploy the function with single command:

    $ faas-cli up
    

    The kafka-connector will now rebuild its topic map and detect that the “email-receipt” function wants to be invoked with messages published on the payment-received topic.

    You can see the response of the function in two places: the function’s logs and in the connector’s logs. This is configurable in the helm chart.

    Produce messages on the topic

    Lets proceed by opening two terminals, one will be used to check the output of the function and the other will create messages on the topic.

    In the first terminal follow the Kafka Connector’s logs:

    $ kubectl logs deploy/kafka-broker -n openfaas \
        --tail 100 \
        --follow
    

    In the second terminal, deploy a client for Kafka to produce messages:

    $ kubectl apply -f - <<EOF
        apiVersion: v1
        kind: Pod
        metadata:
          name: kafka-client
          namespace: default
        spec:
          containers:
          - name: kafka-client
            image: confluentinc/cp-enterprise-kafka:6.1.0
            command:
              - "exec tail -f /dev/null"
    pod/kafka-client created
    

    Connect to the pod and produce messages on the topic:

    # Connect to a shell within the pod:
    kubectl exec -it kafka-client -- /bin/bash
    # Create the topic
    kafka-topics \
      --zookeeper cp-helm-charts-cp-zookeeper-headless:2181 \
      --topic payment-received \
      --create --partitions 1 \
      --replication-factor 1 --if-not-exists
    # Create a message
    MESSAGE="`date -u`"
    # Produce a test message to the topic
    echo "$MESSAGE" | kafka-console-producer --broker-list \
      cp-helm-charts-cp-kafka-headless:9092 \
      --topic payment-received
    

    You’ll now see your messages being sent to the payment-received topic and then the function being invoked.

    OpenFaaS kafka-connector PRO    Version: 0.6.0-rc3      Commit: 3784d5f35d1b6e090e37f211d6af1e51136ff9d6
    2021/12/15 09:56:34 Licensed to: alex <[email protected]>, expires: 77 day(s)
    2021/12/15 09:56:34 Broker: cp-helm-charts-cp-kafka-headless.default:9092       Topic: [payment-received]
    Gateway: http://gateway.openfaas:8080
    Rebuild interval: 30.000000s
    Use TLS: false
    Use SASL: false
    2021/12/15 09:56:34 Binding to topics: [payment-received]
    2019/04/09 18:37:10 Syncing topic map
    2019/04/09 18:37:12 Invoke function: email-receipt
    [#4] Received on [payment-received,0]: 'Kafka and go'
    [200] payment-received => email-receipt
    Hello, Go. You said: Kafka and go
    2019/04/09 18:37:13 Syncing topic map
    2019/04/09 18:37:16 Syncing topic map
    

    Dealing with high-load / long-running functions

    If you’re either dealing with very high-load or long-running functions, there is a way to release pressure and defer the executions of your functions. This uses OpenFaaS’ built-in asynchronous invocation mode.

    Just install the chart again, but this time add:

    --set asyncInvocation=true
    

    In this mode, work is consumed immediately from Kafka, and then buffered in the built-in NATS queue in OpenFaaS, to be executed by the queue-worker.

    See the logs of the queue worker as it executes your requests:

    kubectl logs -n openfaas deploy/queue-worker
    

    Alternatively, you can update the upstreamTimeout value to something longer than the default and keep the existing behaviour:

    --set upstreamTimeout=1m
    

    Wrapping up

    There are multiple real world examples where an event-driven approach with Apache Kafka can help integrate your existing tools and applications, and to extend functionality of existing systems without risking any regression.

    Let me give you an example. Suppose that whenever a new customers signs-up and creates an account we may need to process their information in various ways in addition to just storing it in a SQL table.

    We could publish their data on a topic such as: customer-signup. With an event-driven approach using OpenFaaS and the kafka-connector we can now broadcast a message on the customer-signup topic and then process it in a number of ways by using different functions. I.e. to check their credit score, update a lead in SalesForce and or even schedule a welcome pack in the post. So at any time you can extend the workflow of tasks for a customer-signup message by just defining a new function and giving it an annotation of topic: customer-signup.

    Now that I’ve shown you how to connect to Kafka and explored a real-world use-case, it’s over to you to try it.

    You can use your existing OpenFaaS Pro license, or apply for a 14-day trial: Find out more

    If you would like to remove the Kafka-connector, use helm to delete it:

    $ helm delete --purge kafka-connector
    

    Going further

    The kafka-connector implements the Connector SDK, checkout the SDK written in Go for how you can start connecting your own events and triggers.

    Other examples include the Cron Connector and the vCenter Connector. You can view the other triggers here.

    Editor(s): Alex Ellis

    Learn use-cases for open-source serverless functions and how to integrate them into your existing workflows.

    Through the hands-on exercises you will build your own JavaScript functions using Node.js. You'll add custom npm modules, integrate with other HTTP APIs, enable authentication and monitor the endpoints with Grafana.

    Find out more on Gumroad