Setting up Client cert mutual authentication in a kafka hdf cluster

Note, If keytool not found on path, do this first for your local instalation of java

ln -s /usr/jdk64/jdk1.8.0_112/bin/keytool /bin/keytool

Create Server Certs

Generate server and CA cert from Kafka Node 1 (kafkanode1) - As kafka

mkdir -p /etc/kafka/conf/security
cd /etc/kafka/conf/security
PASSWORD=_passwd_
VALIDITY=1095

## Generate a SSL key and certificate for Broker 1

keytool -keystore kafka.server.keystore.jks -alias localhost -validity $VALIDITY -genkey
(CN=_kafkaNode1_, OU=me, O=me, L=or, ST=fl, C=us)

##### Generating a CA
openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY

## Importing ca to node1's truststore
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert

Generate server cert from Kafka Node 2 (kafkaNode2) - As kafka

mkdir -p /etc/kafka/conf/security
cd /etc/kafka/conf/security
PASSWORD=_passwd_
VALIDITY=1095

## Generate a SSL key and certificate for Broker 2
keytool -keystore kafka.server.keystore.jks -alias localhost -validity $VALIDITY -genkey
(CN=_kafkaNode2_, OU=me, O=me, L=or, ST=fl, C=us)

At this time, copy ca-cert from node1 to node2 under /etc/kafka/conf/security and then import it to node2's truststore

## Importing ca to node2's truststore
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert

Generate server cert for Kafka Node 3 (kafkaNode2) - As kafka

mkdir -p /etc/kafka/conf/security
cd /etc/kafka/conf/security
PASSWORD=_passwd_
VALIDITY=1095

## Generate a SSL key and certificate for Broker 3
keytool -keystore kafka.server.keystore.jks -alias localhost -validity $VALIDITY -genkey
(CN=_kafkaNode3_, OU=me, O=me, L=or, ST=fl, C=us)

At this time, copy ca-cert from node1 to node3 under /etc/kafka/conf/security and then import it to node3's truststore

## Importing ca to node3's truststore
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert

Export the certificate from keystore for each broker

Export the certificate from the keystore for each broker by running the following command on brokerhost1, brokerhost2 and brokerhost3. This generates (make sure the name is unique per broker).

Kafka Node 1 (kafkaNode1) - As kafka

keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file $HOSTNAME.cert-file

Kafka Node 2 (kafkaNode2) - As kafka

keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file $HOSTNAME.cert-file

Now Copy kafkaNode2.cert-file to kafkaNode1 under /etc/kafka/conf/security

Kafka Node 3 (kafkaNode3) - As kafka

keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file $HOSTNAME.cert-file

Now Copy kafkaNode3.cert-file to kafkaNode1 under /etc/kafka/conf/security

Signing the server certs using ca

Sign the certificates with the CA by running the following command on brokerhost1 for each broker’s . Update the command with the CA password that was entered previously. This generates (make sure the out file name is unique per broker)

Kafka Node 1 (kafkaNode1) - As kafka

PASSWORD=_passwd_
VALIDITY=1095
openssl x509 -req -CA ca-cert -CAkey ca-key -in _kafkaNode1_.cert-file -out cert-signed-_kafkaNode1_ -days 365 -CAcreateserial -passin pass:$PASSWORD

openssl x509 -req -CA ca-cert -CAkey ca-key -in _kafkaNode2_.cert-file -out cert-signed-_kafkaNode2_ -days 365 -CAcreateserial -passin pass:$PASSWORD

openssl x509 -req -CA ca-cert -CAkey ca-key -in _kafkaNode3_.cert-file -out cert-signed-_kafkaNode3_ -days 365 -CAcreateserial -passin pass:$PASSWORD

Now copy the signed certs (cert-signed-kafkaNode2 and cert-signed-kafkaNode3) from node1 to node2 and node3 to /etc/kafka/conf/security on those boxes

Finally import signed cert and ca cert in server keystore on Nodes

Kafka Node 1 (kafkaNode1) - As kafka

PASSWORD=_passwd_
VALIDITY=1095
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed-_kafkaNode1_

Kafka Node 2 (kafkaNode2) - As kafka

PASSWORD=_passwd_
VALIDITY=1095
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed-_kafkaNode2_

Kafka Node 3 (kafkaNode3) - As kafka

PASSWORD=_passwd_
VALIDITY=1095
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed-_kafkaNode3_

Now lets Generate a client cert for each service that would interact with kafka

If Kafka brokers are configured to require client authentication by setting ssl.client.auth to required or requested, you must create a client keystore. Run the following command on each client node where the producers and consumers will be running from, replacing with the node’s fully qualified domain name. This generates kafka.client.keystore.jks

Kafka Node 1 (kafkaNode1) - As kafka

mkdir /etc/kafka/conf/security/client
cd /etc/kafka/conf/security/client
cp /etc/kafka/conf/security/ca-cert .
cp /etc/kafka/conf/security/ca-key .
PASSWORD=_clientpasswd_
VALIDITY=1095
SERVICE=__TOPICNAME__

keytool -keystore kafka.client.keystore.jks -alias localhost -validity $VALIDITY -genkey
(CN=_TOPICNAME_, OU=me, O=me, L=or, ST=fl, C=us)
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client.keystore.jks -alias localhost -certreq -file $SERVICE.cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in $SERVICE.cert-file -out cert-signed-$SERVICE -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client.keystore.jks -alias localhost -import -file cert-signed-$SERVICE
rm -rf ca-*
rm -rf cert-signed-_TOPICNAME_ _TOPICNAME_.cert-file

Enable SSL on Kafka HDF

In Ambari UI under the kafka-broker section, update the protocol from PLAINTEXT to SSL in the listeners property. Note that the hostname in the listeners property should remain as localhost. When starting Kafka from Ambari, “localhost” will be replaced with the actual hostname the broker is running on. If you wanna keep both SSL and PLAINTEXT enabled, use listeners=PLAINTEXT://localhost:6667,SSL://localhost:6668

Also update the security.inter.broker.protocol property under Advanced kafka-broker section from PLAINTEXT to SSL

Add following SSL Properties to Custom kafka-broker

ssl.client.auth=requested
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.key.password=_passwd_
ssl.keystore.location=/etc/kafka/conf/security/kafka.server.keystore.jks
ssl.keystore.password=_passwd_
ssl.keystore.type=JKS
ssl.truststore.location=/etc/kafka/conf/security/kafka.server.truststore.jks
ssl.truststore.password=_passwd_
ssl.truststore.type=JKS

Restart the Kafka service from Ambari for the changes to take effect. Verify that Kafka has started with the SSL endpoint indicated by the following message in /var/log/kafka/server.log on any Kafka broker node.

INFO Registered broker 1001 at path /brokers/ids/1001 with addresses: EndPoint(_kafkanode01_,6667,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(_kafkanode01_,6668,ListenerName(SSL),SSL) (kafka.utils.ZkUtils)

Create a client ssl config file for kafka console consumer

Now if you wish to use Kafka Console client, you will need to supply a client-SSL.properties file to kafka in order to connect on SSL. Create following file in /usr/hdf/3.0.2.0-76/kafka/config folder of the server that you wish to run the client from. I am using Kafka node 1.

Kafka Node 1 (kafkaNode1) - As kafka

config/client-ssl.properties

security.protocol=SSL
ssl.keystore.location=/etc/kafka/conf/security/kafka.client.keystore.jks
ssl.keystore.password=_clientpasswd_
ssl.key.password=_clientpasswd_
ssl.truststore.location=/etc/kafka/conf/security/kafka.client.truststore.jks
ssl.truststore.password=_clientpasswd_
ssl.keystore.type=JKS
ssl.truststore.type=JKS

Now test out the SSL Config by firing up console producer and consumer on seperate windows

console-consumer

bin/kafka-console-consumer.sh  --topic _TOPICNAME_ --from-beginning --bootstrap-server _kafkaNode1_:6668,_kafkaNode2_:6668,_kafkaNode3_:6668 --consumer.config config/client-ssl.properties

console-producer

bin/kafka-console-producer.sh --broker-list _kafkaNode1_:6668,_kafkaNode2_:6668,_kafkaNode3_:6668 --topic _TOPICNAME_ --producer.config config/client-ssl.properties

Extract the keys

I like to use python for small to mid size projects. In order to use python to connect to Kafka over SSL, you will first have to export certificates and keys from the JKS container to PEM format to use them inside kafka-python

Assuming you were able to generate a client cert for your client by following the instructions above,, you now have two JKS containers: ‘kafka.client.keystore.jks’ and ‘kafka.client.truststore.jks’. The first one contains the signed client certificate, its private key and the ‘CARoot’ certificate used to sign them. The second one contains the certificate with which the client certificate and key were signed. Therefore, everything we need is contained in the ‘kafka.client.keystore.jks’ file. To get an overview of its content you can call

keytool -list -rfc -keystore kafka.client.keystore.jks

Now lets get to work. First, we will extract the client certificate:

keytool -exportcert -alias localhost -keystore kafka.client.keystore.jks -rfc -file certificate.pem

Next we will extract the clients key. This is not supported directly by keytool, which is why we have to convert the keystore to pkcs12 format first and then extract the private key from that:

keytool -v -importkeystore -srckeystore kafka.client.keystore.jks -srcalias localhost -destkeystore cert_and_key.p12 -deststoretype PKCS12
openssl pkcs12 -in cert_and_key.p12 -nocerts -nodes

The second command only prints the key to STDOUT. From there it can be copied and pasted into ‘key.pem’. Make sure to copy the lines inclusive between —–BEGIN PRIVATE KEY—– and —–END PRIVATE KEY—–

Finally we will extract the CARoot certificate

keytool -exportcert -alias CARoot -keystore kafka.client.keystore.jks -rfc -file CARoot.pem

Connect by kafka-python

Now we have the three files ‘certificate.pem’, ‘key.pem’, ‘CARoot.pem’. With kafka-python they can be passed as argument of the constructor of the consumer and producer:

import threading, logging, time
import multiprocessing
from datetime import datetime

from kafka import KafkaConsumer, KafkaProducer


class Producer(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.stop_event = threading.Event()

    def stop(self):
        self.stop_event.set()

    def run(self):
        producer = KafkaProducer(bootstrap_servers='_kafkaNode1_:6668,_kafkaNode2_:6668,_kafkaNode3_:6668',
                                 security_protocol='SSL',
                                 ssl_check_hostname=False,
                                 ssl_cafile='CARoot.pem',
                                 ssl_certfile='certificate.pem',
                                 ssl_keyfile='key.pem')

        while not self.stop_event.is_set():
            todays_date_dt=datetime.now()
            st=datetime.strftime(todays_date_dt, '%Y-%m-%d %H:%M:%S')
            st_b=str.encode(st)
            producer.send('_TOPICNAME_', st_b)
            time.sleep(1)

        producer.close()

class Consumer(multiprocessing.Process):
    def __init__(self):
        multiprocessing.Process.__init__(self)
        self.stop_event = multiprocessing.Event()

    def stop(self):
        self.stop_event.set()

    def run(self):
        consumer = KafkaConsumer(bootstrap_servers='_kafkaNode1_:6668,_kafkaNode2_:6668,_kafkaNode3_:6668',
                                 auto_offset_reset='earliest',
                                 consumer_timeout_ms=1000,
                                 security_protocol='SSL',
                                 ssl_check_hostname=False,
                                 ssl_cafile='CARoot.pem',
                                 ssl_certfile='certificate.pem',
                                 ssl_keyfile='key.pem')
        consumer.subscribe(['_TOPICNAME_'])

        while not self.stop_event.is_set():
            for message in consumer:
                print(message)
                if self.stop_event.is_set():
                    break

        consumer.close()


def main():
    tasks = [
        Producer(),
        Consumer()
    ]

    for t in tasks:
        t.start()

    time.sleep(10)

    for task in tasks:
        task.stop()

    for task in tasks:
        task.join()


if __name__ == "__main__":
    logging.basicConfig(
        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
        level=logging.INFO
        )
    main()

This concludes Client Cert Mutual authentication setup.

Please refer to next blog for instructions on how to setup appropriate ACL's for each topic and restrict it to one client cert.

Authorization setup in Ambari Kafka based on ACLs



Comments

comments powered by Disqus