添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Overview

When using distributed architecture, a message broker is a key player component in the architecture model, we need asynchronous tools to decouple different components and avoid cascade failure.

Apache Kafka is the most mature messaging broker, it has been developed by Linkendin and then Donated to Apache community.

Installing and managing Apache Kafka cluster requires some specific challenges, one of them is the security of different components involved in the Apache Kafka ecosystem

A production Kafka ecosystem is usually composed of:

  • Zookeeper cluster: which is used to store Kafak metadata like topic, broker configuration, partitions,.. etc., Zookeeper is composed by more than 1 node, usually three or five nodes (Quarum).
  • Breakers: witch is a cluster of brokers used to store data within topics, in a production environment at least three brokers are used, to ensure the resiliency and fault tolerance of the system.
  • Producer APP: which are client application, with responsibility to produce data in the given topic
  • Consumer APP: which are client application, with responsibility to consume data from topics
  • Administration tools: are tools like Kafka Cli witch allow an administrator to manage the cluster.
  • As you can see Kafka ecosystem is a distributed system, data is exchanged between different component through the network, and each component has its own responsibilities. By default, data is exchanged on clear without encryption, and without authentication or authorization, which can bring some security issues within the system.

    In this post we will see together, how to efficiently secure the traffic between the different component by implementing encryption, authentication and authorization

    You can download source code of this post from GitHub

    Setup your own PKI

    Let’s start creating our PKI (private key infrastructure)

    For this post I am using a self signed certifcate as a certificate Authority, in a prouction environnement you need to use your company CA. We need to create a server certificate for each node in the Kafka infrastructure, in our example three nodes of ZK an three nodes of Apache kafka borkers.

    We also need to create a client certificate for Kafka admin tools, this certificate will be used later to authenticate admin to Apache Kafka cluster.

    Kafka is JAVA application, so we need to package server certificate and CA certificate on JKS format.

  • Trust Stores contain CA certificate
  • Key Stores contain server certificate and private key
  • You can use kafka-pki/generate-ca.sh to generate self signed certificate

    #CA Certificate
    openssl genrsa  -out rootCA.key 4096
    openssl req -x509 -new -nodes -key rootCA.key \
            -sha256 -days 1024 \
            -subj "/C=CA/ST=QC/L=Montreal/O=Digital Infornation Inc CA Root Authority/OU=IT" \
            -out rootCA.crt
    

    Once it’s done, you can use generate-truststore. Sh to create a trust store for each node the first argument is the FQDN of your node and the second argument is the password used to secure the JKS

    INSTANCE=$1
    PASSWORD=$2
    CA_ALIAS="rootCA"
    CA_CERT_FILE="RootCA.crt" 
    #### Generate Truststore and import ROOT CA certificate ####
    keytool -keystore truststore/$INSTANCE.truststore.jks \
     -import -alias $CA_ALIAS -file $CA_CERT_FILE \
     -storepass $PASSWORD
    

    The last step is to create key stores, key store contains a server certificate signed by the CA, private key and the CA certificate Use generate-keystore.sh script to create a key store for each node The first argument is the FQDN of your node and the second argument is the password used to secure the JKS

    COMMON_NAME=$1
    PASSWORD=$2
    ORGANIZATIONAL_UNIT="IT"
    ORGANIZATION="Digital Infornation Inc CA Root Authority"
    CITY="Montreal"
    STATE="QC"
    COUNTRY="CA"
    CA_ALIAS="ca-root"
    CA_CERT_FILE="rootCa.crt" 
    CA_KEY_FILE="rootCa.key" 
    VALIDITY_DAYS=365
    # Generate Keystore with Private Key
    keytool -keystore keystore/$COMMON_NAME.keystore.jks \
                -alias $COMMON_NAME -validity $VALIDITY_DAYS \
                -genkey -keyalg RSA \
                -storepass $PASSWORD \
                -dname "CN=$COMMON_NAME, OU=$ORGANIZATIONAL_UNIT, O=$ORGANIZATION, L=$CITY, ST=$STATE, C=$COUNTRY"
    # Generate Certificate Signing Request (CSR) using the newly created KeyStore
    keytool -keystore keystore/$COMMON_NAME.keystore.jks \
             -storepass $PASSWORD \
             -alias $COMMON_NAME -certreq -file $COMMON_NAME.csr 
    # Sign the CSR using the custom CA
    openssl x509 -req -CA $CA_CERT_FILE -CAkey $CA_KEY_FILE \
             -in $COMMON_NAME.csr -out $COMMON_NAME.signed \
             -days $VALIDITY_DAYS -CAcreateserial
    # Import ROOT CA certificate into Keystore
    keytool -keystore keystore/$COMMON_NAME.keystore.jks \
        -storepass $PASSWORD \
        -alias $CA_ALIAS -importcert -file $CA_CERT_FILE
    # Import newly signed certificate into Keystore
    keytool -keystore keystore/$COMMON_NAME.keystore.jks \
        -storepass $PASSWORD \
        -alias $COMMON_NAME -importcert -file $COMMON_NAME.signed
    # Clean-up 
    rm $COMMON_NAME.csr
    rm $COMMON_NAME.signed
    rm rootCa.srl
    

    Encription

    Now that we have a PKI setup, we can start encrypting the data exchanged between different component. I am using docker-compose file to simulate Kafka ecosystem, with three ZK Quarium and three Kafka brokers.

    We are going to start by sharing JKS files with each docker container by sharing desktop volume with docker containers

    Then we configure ZK to use SSL when replicating data between different node in the quarum, we also configure secure client port, to allow Apache Kafka brokers, witch are ZK client to use secure endpoint

    The same configuration is duplicated to two others nodes of ZK quorum

      zookeeper-1:
        image: zookeeper:3.7.0
        hostname: zookeeper-1
        container_name: zookeeper-1
        volumes:
            - ../kafka-pki/keystore/zookeeper-1.keystore.jks:/security/zookeeper-1.keystore.jks
            - ../kafka-pki/truststore/zookeeper-1.truststore.jks:/security/zookeeper-1.truststore.jks
        environment:
          ZOO_MY_ID: 1
          ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
          ZOO_CFG_EXTRA: "sslQuorum=true
                    portUnification=false
                    serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
                    ssl.quorum.hostnameVerification=false
                    ssl.quorum.keyStore.location=/security/zookeeper-1.keystore.jks
                    ssl.quorum.keyStore.password=password
                    ssl.quorum.trustStore.location=/security/zookeeper-1.truststore.jks
                    ssl.quorum.trustStore.password=password
                    secureClientPort=2281
                    ssl.hostnameVerification=false
                    ssl.keyStore.location=/security/zookeeper-1.keystore.jks
                    ssl.keyStore.password=password
                    ssl.trustStore.location=/security/zookeeper-1.truststore.jks
                    ssl.trustStore.password=password"
    

    From the broker perspective, we need to configure Kafka broker to connect to ZK secure port and to replicate topics data between broker using SSL

    We need to add a listener using SSL protocol, then we need to configure the broker with key store and trust store and enforce communication inter broker to use the SSL protocol

    The same configuration is duplicated with all Kafka cluster nodes

    broker-1: image: confluentinc/cp-kafka:7.0.1 hostname: broker-1 container_name: broker-1 depends_on: - zookeeper-1 - zookeeper-2 - zookeeper-3 ports: - "9191:9191" volumes: - ../kafka-pki/keystore/broker-1.keystore.jks:/etc/kafka/secrets/broker-1.keystore.jks - ../kafka-pki/truststore/broker-1.truststore.jks:/etc/kafka/secrets/broker-1.truststore.jks - ../kafka-pki/password.txt:/etc/kafka/secrets/password.txt environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2281,zookeeper-2:2281,zookeeper-3:2281 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: SSL:SSL KAFKA_ADVERTISED_LISTENERS: SSL://broker-1:9191 KAFKA_LISTENERS: SSL://broker-1:9191 KAFKA_DEFAULT_REPLICATION_FACTOR: 3 KAFKA_MIN_INSYNC_REPLICAS: 2 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_ZOOKEEPER_SSL_CLIENT_ENABLE: "true" KAFKA_ZOOKEEPER_CLIENT_CNXN_SOCKET: org.apache.zookeeper.ClientCnxnSocketNetty KAFKA_ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/broker-1.keystore.jks KAFKA_ZOOKEEPER_SSL_KEYSTORE_PASSWORD: password KAFKA_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/broker-1.truststore.jks KAFKA_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD: password KAFKA_SSL_CLIENT_AUTH: none KAFKA_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/broker-1.keystore.jks KAFKA_SSL_KEYSTORE_PASSWORD: password KAFKA_SSL_KEY_PASSWORD: password KAFKA_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/broker-1.truststore.jks KAFKA_SSL_TRUSTSTORE_PASSWORD: password KAFKA_SSL_KEYSTORE_FILENAME: broker-1.keystore.jks KAFKA_SSL_KEYSTORE_CREDENTIALS: password.txt KAFKA_SSL_KEY_CREDENTIALS: password.txt KAFKA_SSL_TRUSTSTORE_FILENAME: broker-1.truststore.jks KAFKA_SSL_TRUSTSTORE_CREDENTIALS: password.txt KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL

    Form the consumer and producer perspective, we need to adjust the configuration to use the SSL protocol, by adding a CA public certificate to application trust store.

    Producer

    bootstrap.servers=broker-1:9192,broker-2:9193,broker-3:9194
    security.protocol=SSL
    ssl.ca.location=~/Documents/work/kafka-security/kafka-pki/rootCA.crt
    

    Consumer

    bootstrap.servers=broker-1:9192,broker-2:9193,broker-3:9194
    security.protocol=SSL
    ssl.ca.location=~/Documents/work/kafka-security/kafka-pki/rootCA.crt
    group.id=consumer.group-1
    

    Authentication

    Now that traffic is encrypted, the next step is to enforce authentication, Apache Kafka offers different protocols to implement authentication. In this post we are going to use mutual SSL or mTLS to authenticate brokers to each others, brokers need to communicate between them to replicate data, mTLS will also be used to authenticate administrator tools.

    For a client application (producers and consumers) we are going to use SASL-SCRAM, which allows client applications to authenticate using user name and secret.

    We need to add another Kafka listener with security protocol SASL_SSL, then to adapt the configuration to enforce Kafka client authentication, then we enable SASL MECHANISMS SCRAM-SHA-512.

    To enable scram we need to add JAAS configuration to java security login configuration

    KafkaServer {
        org.apache.kafka.common.security.scram.ScramLoginModule required 
    

    The same configuration must be done in each broker of Apache Kafka cluster

    broker-1:
        image: confluentinc/cp-kafka:7.0.1
        hostname: broker-1
        container_name: broker-1
        depends_on:
          - zookeeper-1
          - zookeeper-2
          - zookeeper-3
        ports:
          - "9191:9191"
          - "9291:9291"
        volumes:
            - ../kafka-pki/keystore/broker-1.keystore.jks:/etc/kafka/secrets/broker-1.keystore.jks
            - ../kafka-pki/truststore/broker-1.truststore.jks:/etc/kafka/secrets/broker-1.truststore.jks
            - ../kafka-pki/password.txt:/etc/kafka/secrets/password.txt
            - ./jaas/sasl-scram/ssl-scram-jaas-broker.conf:/etc/kafka/secrets/ssl-scram-jaas-broker.conf
        environment:
              KAFKA_BROKER_ID: 1
              KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2281,zookeeper-2:2281,zookeeper-3:2281
              KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: SASL_SSL:SASL_SSL,SSL:SSL
              KAFKA_ADVERTISED_LISTENERS: SASL_SSL://broker-1:9191,SSL://broker-1:9291
              KAFKA_LISTENERS: SASL_SSL://broker-1:9191,SSL://broker-1:9291
              KAFKA_DEFAULT_REPLICATION_FACTOR: 3
              KAFKA_MIN_INSYNC_REPLICAS: 2
              KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
              KAFKA_ZOOKEEPER_SSL_CLIENT_ENABLE: "true"
              KAFKA_ZOOKEEPER_CLIENT_CNXN_SOCKET: org.apache.zookeeper.ClientCnxnSocketNetty
              KAFKA_ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/broker-1.keystore.jks
              KAFKA_ZOOKEEPER_SSL_KEYSTORE_PASSWORD: password
              KAFKA_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/broker-1.truststore.jks
              KAFKA_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD: password
              KAFKA_SSL_CLIENT_AUTH: required
              KAFKA_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/broker-1.keystore.jks
              KAFKA_SSL_KEYSTORE_PASSWORD: password
              KAFKA_SSL_KEY_PASSWORD: password
              KAFKA_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/broker-1.truststore.jks
              KAFKA_SSL_TRUSTSTORE_PASSWORD: password
              KAFKA_SSL_KEYSTORE_FILENAME: broker-1.keystore.jks
              KAFKA_SSL_KEYSTORE_CREDENTIALS: password.txt
              KAFKA_SSL_KEY_CREDENTIALS: password.txt
              KAFKA_SSL_TRUSTSTORE_FILENAME: broker-1.truststore.jks
              KAFKA_SSL_TRUSTSTORE_CREDENTIALS: password.txt
              KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
              KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-512          
              KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/ssl-scram-jaas-broker.conf"
    

    To add a client application user name and secret, we are going to use Kafka client tools, SSL_SCRAM stores users and secret as Kafka configuration in Zookeeper. The script below adds two users, producer and consumer as Kafka application users

    PRODUCER_USER="producer"
    PRODUCER_PASSWORD="P@ssMordProducerScram"
    CONSUMER_USER="consumer"
    CONSUMER_PASSWORD="P@ssMordConsumerScram"
    #Add Path to kafka bin folder
    KAFKA_BIN_FOLDER=~/Documents/work/kafka_2.13-3.1.0/bin
    chmod +x ${KAFKA_BIN_FOLDER}/kafka-configs.sh
    ${KAFKA_BIN_FOLDER}/kafka-configs.sh --zookeeper localhost:12181 --alter \
            --add-config 'SCRAM-SHA-512=[password='${PRODUCER_PASSWORD}']' \
            --entity-type users \
            --entity-name $PRODUCER_USER
    ${KAFKA_BIN_FOLDER}/kafka-configs.sh --zookeeper localhost:12181 --alter \
            --add-config 'SCRAM-SHA-512=[password='${CONSUMER_PASSWORD}']' \
            --entity-type users \
            --entity-name $CONSUMER_USER
    

    The last modification that we need to do is to adaptclient applications configuration, to use SCRAM_SSL protocol.

    producer.config

    bootstrap.servers=broker-1:9192,broker-2:9193,broker-3:9194
    security.protocol=SASL_SSL
    sasl.mechanisms=SCRAM-SHA-512
    sasl.username=producer
    sasl.password=P@ssMordProducerScram
    ssl.ca.location=~/Documents/work/kafka-security/kafka-pki/rootCA.crt
    

    consumer.config

    bootstrap.servers=broker-1:9192,broker-2:9193,broker-3:9194
    security.protocol=SASL_SSL
    sasl.mechanisms=SCRAM-SHA-512
    sasl.username=consumer
    sasl.password=P@ssMordConsumerScram
    ssl.ca.location=~/Documents/work/kafka-security/kafka-pki/rootCA.crt
    group.id=consumer.group-1
    

    Authorization

    Access Control Lists (ACLs) provide important solution to grant each Apache Kafka specific right to specific Kafka resources.

    Apache Kafka ships with a pluggable, out-of-the-box Authorizer implementation that uses Apache ZooKeeper to store all the ACLs. It is important to set ACLs because otherwise access to resources is limited to super users when an Authorizer is configured.

    The default behavior is that if a resource has no associated ACLs, then no one is allowed to access the resource including brokers, except super users.

    We need to configure brokers as super user, to allow brokers to communicate between them, we need also to configure Admin user (user to be used by admin client tools) as super user As a best practice, you should restrict the super user list of users configured with mTLS Authentication.

    We need to add the configuration below to each Kafka broker, node that super user list contains the subject name of the broker server certificate and the subject name of the client admin tools certificate

        KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
        KAFKA_SUPER_USERS: "User:CN=broker-1,OU=IT,O=Digital Infornation Inc CA Root Authority,L=Montreal,ST=QC,C=CA;
                            User:CN=broker-2,OU=IT,O=Digital Infornation Inc CA Root Authority,L=Montreal,ST=QC,C=CA;
                            User:CN=broker-3,OU=IT,O=Digital Infornation Inc CA Root Authority,L=Montreal,ST=QC,C=CA;
                            User:CN=admin,OU=IT,O=Digital Infornation Inc CA Root Authority,L=Montreal,ST=QC,C=CA"
    

    To configure ACL, we need to use Kafka client tools, we need to create a topic, and grant producer and consumer users specific access rules to the created topic.

  • Grant producer rules to publish events on the topic
  • Grant consumer rules to consumer events with a specific consumer group from the same topic
  • The script below creates the desired configuration

    #!/bin/bash
    PRODUCER_USER="producer"
    PRODUCER_PASSWORD="P@ssMordProducerScram"
    CONSUMER_USER="consumer"
    CONSUMER_PASSWORD="P@ssMordConsumerScram"
    TOPIC_TEST="myTestTopic"
    CONSUMER_GROUP="consumer.group-1"
    #Add Path to kafka bin folder
    KAFKA_BIN_FOLDER=~/Documents/work/kafka_2.13-3.1.0/bin
    chmod +x ${KAFKA_BIN_FOLDER}/kafka-configs.sh
    chmod +x ${KAFKA_BIN_FOLDER}/kafka-acls.sh
    ${KAFKA_BIN_FOLDER}/kafka-configs.sh --zookeeper localhost:12181 --alter \
            --add-config 'SCRAM-SHA-512=[password='${PRODUCER_PASSWORD}']' \
            --entity-type users \
            --entity-name $PRODUCER_USER
    ${KAFKA_BIN_FOLDER}/kafka-configs.sh --zookeeper localhost:12181 --alter \
            --add-config 'SCRAM-SHA-512=[password='${CONSUMER_PASSWORD}']' \
            --entity-type users \
            --entity-name $CONSUMER_USER
    ${KAFKA_BIN_FOLDER}/kafka-acls.sh \
      --bootstrap-server broker-1:9291 \
      --list \
      --command-config ./admin.properties
    ${KAFKA_BIN_FOLDER}/kafka-topics.sh \
      --bootstrap-server broker-1:9291 \
      --create \
      --topic $TOPIC_TEST \
      --partitions 3 
      --command-config ./admin.properties
    ${KAFKA_BIN_FOLDER}/kafka-acls.sh \
      --bootstrap-server broker-1:9291 \
      --add \
      --allow-principal User:$PRODUCER_USER \
      --operation WRITE \
      --topic $TOPIC_TEST \
      --command-config ./admin.properties
    ${KAFKA_BIN_FOLDER}//kafka-acls.sh \
      --bootstrap-server broker-1:9291 \
      --add \
      --allow-principal User:$CONSUMER_USER \
      --operation READ \
      --topic $TOPIC_TEST \
      --command-config ./admin.properties
    ${KAFKA_BIN_FOLDER}//kafka-acls.sh \
      --bootstrap-server broker-1:9291 \
      --add \
      --allow-principal User:$CONSUMER_USER \
      --operation All \
      --group $CONSUMER_GROUP \
      --command-config ./admin.properties
    

    we need to authentiate using admin server certificate witch is configured as super user.

    admin.properties

    security.protocol=SSL
    ssl.truststore.location=~/Documents/work/kafka-security/kafka-pki/truststore/admin.truststore.jks
    ssl.truststore.password=password
    ssl.keystore.location=~/Documents/work/kafka-security/kafka-pki/keystore/admin.keystore.jks
    ssl.keystore.password=password
    ssl.key.password=password
    

    There is no need to change Kafka client configuration !!