Setting up Client cert mutual authentication in a kafka hdf cluster
Note, If keytool not found on path, do this first for your local instalation of java
ln -s /usr/jdk64/jdk1.8.0_112/bin/keytool /bin/keytool
Create Server Certs
Generate server and CA cert from Kafka Node 1 (kafkanode1) - As kafka
mkdir -p /etc/kafka/conf/security
cd /etc/kafka/conf/security
PASSWORD=_passwd_
VALIDITY=1095
## Generate a SSL key and certificate for Broker 1
keytool -keystore kafka.server.keystore.jks -alias localhost -validity $VALIDITY -genkey
(CN=_kafkaNode1_, OU=me, O=me, L=or, ST=fl, C=us)
##### Generating a CA
openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY
## Importing ca to node1's truststore
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
Generate server cert from Kafka Node 2 (kafkaNode2) - As kafka
mkdir -p /etc/kafka/conf/security
cd /etc/kafka/conf/security
PASSWORD=_passwd_
VALIDITY=1095
## Generate a SSL key and certificate for Broker 2
keytool -keystore kafka.server.keystore.jks -alias localhost -validity $VALIDITY -genkey
(CN=_kafkaNode2_, OU=me, O=me, L=or, ST=fl, C=us)
At this time, copy ca-cert from node1 to node2 under /etc/kafka/conf/security and then import it to node2's truststore
## Importing ca to node2's truststore
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
Generate server cert for Kafka Node 3 (kafkaNode2) - As kafka
mkdir -p /etc/kafka/conf/security
cd /etc/kafka/conf/security
PASSWORD=_passwd_
VALIDITY=1095
## Generate a SSL key and certificate for Broker 3
keytool -keystore kafka.server.keystore.jks -alias localhost -validity $VALIDITY -genkey
(CN=_kafkaNode3_, OU=me, O=me, L=or, ST=fl, C=us)
At this time, copy ca-cert from node1 to node3 under /etc/kafka/conf/security and then import it to node3's truststore
## Importing ca to node3's truststore
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
Export the certificate from keystore for each broker
Export the certificate from the keystore for each broker by running the following command on brokerhost1, brokerhost2 and brokerhost3. This generates
Kafka Node 1 (kafkaNode1) - As kafka
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file $HOSTNAME.cert-file
Kafka Node 2 (kafkaNode2) - As kafka
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file $HOSTNAME.cert-file
Now Copy kafkaNode2.cert-file to kafkaNode1 under /etc/kafka/conf/security
Kafka Node 3 (kafkaNode3) - As kafka
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file $HOSTNAME.cert-file
Now Copy kafkaNode3.cert-file to kafkaNode1 under /etc/kafka/conf/security
Signing the server certs using ca
Sign the certificates with the CA by running the following command on brokerhost1 for each broker’s
Kafka Node 1 (kafkaNode1) - As kafka
PASSWORD=_passwd_
VALIDITY=1095
openssl x509 -req -CA ca-cert -CAkey ca-key -in _kafkaNode1_.cert-file -out cert-signed-_kafkaNode1_ -days 365 -CAcreateserial -passin pass:$PASSWORD
openssl x509 -req -CA ca-cert -CAkey ca-key -in _kafkaNode2_.cert-file -out cert-signed-_kafkaNode2_ -days 365 -CAcreateserial -passin pass:$PASSWORD
openssl x509 -req -CA ca-cert -CAkey ca-key -in _kafkaNode3_.cert-file -out cert-signed-_kafkaNode3_ -days 365 -CAcreateserial -passin pass:$PASSWORD
Now copy the signed certs (cert-signed-kafkaNode2 and cert-signed-kafkaNode3) from node1 to node2 and node3 to /etc/kafka/conf/security on those boxes
Finally import signed cert and ca cert in server keystore on Nodes
Kafka Node 1 (kafkaNode1) - As kafka
PASSWORD=_passwd_
VALIDITY=1095
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed-_kafkaNode1_
Kafka Node 2 (kafkaNode2) - As kafka
PASSWORD=_passwd_
VALIDITY=1095
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed-_kafkaNode2_
Kafka Node 3 (kafkaNode3) - As kafka
PASSWORD=_passwd_
VALIDITY=1095
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed-_kafkaNode3_
Now lets Generate a client cert for each service that would interact with kafka
If Kafka brokers are configured to require client authentication by setting ssl.client.auth to required or requested, you must create a client keystore. Run the following command on each client node where the producers and consumers will be running from, replacing
Kafka Node 1 (kafkaNode1) - As kafka
mkdir /etc/kafka/conf/security/client
cd /etc/kafka/conf/security/client
cp /etc/kafka/conf/security/ca-cert .
cp /etc/kafka/conf/security/ca-key .
PASSWORD=_clientpasswd_
VALIDITY=1095
SERVICE=__TOPICNAME__
keytool -keystore kafka.client.keystore.jks -alias localhost -validity $VALIDITY -genkey
(CN=_TOPICNAME_, OU=me, O=me, L=or, ST=fl, C=us)
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client.keystore.jks -alias localhost -certreq -file $SERVICE.cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in $SERVICE.cert-file -out cert-signed-$SERVICE -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client.keystore.jks -alias localhost -import -file cert-signed-$SERVICE
rm -rf ca-*
rm -rf cert-signed-_TOPICNAME_ _TOPICNAME_.cert-file
Enable SSL on Kafka HDF
In Ambari UI under the kafka-broker
section, update the protocol from PLAINTEXT
to SSL
in the listeners
property. Note that the hostname in the listeners property should remain as localhost. When starting Kafka from Ambari, “localhost” will be replaced with the actual hostname the broker is running on. If you wanna keep both SSL and PLAINTEXT enabled, use listeners=PLAINTEXT://localhost:6667,SSL://localhost:6668
Also update the security.inter.broker.protocol
property under Advanced kafka-broker
section from PLAINTEXT
to SSL
Add following SSL Properties to Custom kafka-broker
ssl.client.auth=requested
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.key.password=_passwd_
ssl.keystore.location=/etc/kafka/conf/security/kafka.server.keystore.jks
ssl.keystore.password=_passwd_
ssl.keystore.type=JKS
ssl.truststore.location=/etc/kafka/conf/security/kafka.server.truststore.jks
ssl.truststore.password=_passwd_
ssl.truststore.type=JKS
Restart the Kafka service from Ambari for the changes to take effect. Verify that Kafka has started with the SSL endpoint indicated by the following message in /var/log/kafka/server.log on any Kafka broker node.
INFO Registered broker 1001 at path /brokers/ids/1001 with addresses: EndPoint(_kafkanode01_,6667,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(_kafkanode01_,6668,ListenerName(SSL),SSL) (kafka.utils.ZkUtils)
Create a client ssl config file for kafka console consumer
Now if you wish to use Kafka Console client, you will need to supply a client-SSL.properties file to kafka in order to connect on SSL. Create following file in /usr/hdf/3.0.2.0-76/kafka/config folder of the server that you wish to run the client from. I am using Kafka node 1.
Kafka Node 1 (kafkaNode1) - As kafka
config/client-ssl.properties
security.protocol=SSL
ssl.keystore.location=/etc/kafka/conf/security/kafka.client.keystore.jks
ssl.keystore.password=_clientpasswd_
ssl.key.password=_clientpasswd_
ssl.truststore.location=/etc/kafka/conf/security/kafka.client.truststore.jks
ssl.truststore.password=_clientpasswd_
ssl.keystore.type=JKS
ssl.truststore.type=JKS
Now test out the SSL Config by firing up console producer and consumer on seperate windows
console-consumer
bin/kafka-console-consumer.sh --topic _TOPICNAME_ --from-beginning --bootstrap-server _kafkaNode1_:6668,_kafkaNode2_:6668,_kafkaNode3_:6668 --consumer.config config/client-ssl.properties
console-producer
bin/kafka-console-producer.sh --broker-list _kafkaNode1_:6668,_kafkaNode2_:6668,_kafkaNode3_:6668 --topic _TOPICNAME_ --producer.config config/client-ssl.properties
Extract the keys
I like to use python for small to mid size projects. In order to use python to connect to Kafka over SSL, you will first have to export certificates and keys from the JKS container to PEM format to use them inside kafka-python
Assuming you were able to generate a client cert for your client by following the instructions above,, you now have two JKS containers: ‘kafka.client.keystore.jks’ and ‘kafka.client.truststore.jks’. The first one contains the signed client certificate, its private key and the ‘CARoot’ certificate used to sign them. The second one contains the certificate with which the client certificate and key were signed. Therefore, everything we need is contained in the ‘kafka.client.keystore.jks’ file. To get an overview of its content you can call
keytool -list -rfc -keystore kafka.client.keystore.jks
Now lets get to work. First, we will extract the client certificate:
keytool -exportcert -alias localhost -keystore kafka.client.keystore.jks -rfc -file certificate.pem
Next we will extract the clients key. This is not supported directly by keytool, which is why we have to convert the keystore to pkcs12 format first and then extract the private key from that:
keytool -v -importkeystore -srckeystore kafka.client.keystore.jks -srcalias localhost -destkeystore cert_and_key.p12 -deststoretype PKCS12
openssl pkcs12 -in cert_and_key.p12 -nocerts -nodes
The second command only prints the key to STDOUT. From there it can be copied and pasted into ‘key.pem’. Make sure to copy the lines inclusive between —–BEGIN PRIVATE KEY—–
and —–END PRIVATE KEY—–
Finally we will extract the CARoot certificate
keytool -exportcert -alias CARoot -keystore kafka.client.keystore.jks -rfc -file CARoot.pem
Connect by kafka-python
Now we have the three files ‘certificate.pem’, ‘key.pem’, ‘CARoot.pem’. With kafka-python they can be passed as argument of the constructor of the consumer and producer:
import threading, logging, time
import multiprocessing
from datetime import datetime
from kafka import KafkaConsumer, KafkaProducer
class Producer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.stop_event = threading.Event()
def stop(self):
self.stop_event.set()
def run(self):
producer = KafkaProducer(bootstrap_servers='_kafkaNode1_:6668,_kafkaNode2_:6668,_kafkaNode3_:6668',
security_protocol='SSL',
ssl_check_hostname=False,
ssl_cafile='CARoot.pem',
ssl_certfile='certificate.pem',
ssl_keyfile='key.pem')
while not self.stop_event.is_set():
todays_date_dt=datetime.now()
st=datetime.strftime(todays_date_dt, '%Y-%m-%d %H:%M:%S')
st_b=str.encode(st)
producer.send('_TOPICNAME_', st_b)
time.sleep(1)
producer.close()
class Consumer(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
self.stop_event = multiprocessing.Event()
def stop(self):
self.stop_event.set()
def run(self):
consumer = KafkaConsumer(bootstrap_servers='_kafkaNode1_:6668,_kafkaNode2_:6668,_kafkaNode3_:6668',
auto_offset_reset='earliest',
consumer_timeout_ms=1000,
security_protocol='SSL',
ssl_check_hostname=False,
ssl_cafile='CARoot.pem',
ssl_certfile='certificate.pem',
ssl_keyfile='key.pem')
consumer.subscribe(['_TOPICNAME_'])
while not self.stop_event.is_set():
for message in consumer:
print(message)
if self.stop_event.is_set():
break
consumer.close()
def main():
tasks = [
Producer(),
Consumer()
]
for t in tasks:
t.start()
time.sleep(10)
for task in tasks:
task.stop()
for task in tasks:
task.join()
if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)
main()
This concludes Client Cert Mutual authentication setup.
Please refer to next blog for instructions on how to setup appropriate ACL's for each topic and restrict it to one client cert.
Comments
comments powered by Disqus