张子阳的博客

首页 读书 技术 店铺 关于
张子阳的博客 首页 读书 技术 关于

Kafka分布式消息系统(SSL安全连接) - Part.6

2019-07-08 张子阳 分类: 分布式系统

在实际应用中,有时候需要将kafka集群公布在互联网上,以便接入数据,这样就带来了安全问题。显然,不能允许任意客户端都可以写入或者读取kafka中的数据。同时,还要避免数据在网络传输的过程中被截取。因此,就需要采用一定的安全策略。Kafka提供了好几种安全传输和认证的方式,本文将演示如何使用SSL/TSL来进行认证和加密传说。

这篇文章分为了三个部分:生成密钥/证书/CA、配置Kafka集群、配置并测试集群客户端。其中最主要的第一步,就是生成正确的证书。

生成密钥/证书/CA

生成 kafka1.keystore.jks

kafka1.keystore.jks 需要在集群的每台机器上单独生成,它包含了两个东西:

证书相当于一个ID,来表名自己是谁(区分不同的服务器)。此时,这个证书还没有被任何CA(Certificate Authority,证书颁发机构)所认证(进行签名)。证书被CA签名,也叫做被CA认证。证书可以通过下面两种方式签名:

  1. 先自己生成一个证书,再让CA签名。
  2. 直接从CA处申请一个,从CA处申请的自然已经由该CA签名过了。

可以使用java自带的keytool来生成keystore:

keytool -keystore kafka1.keystore.jks -alias kafka1 -validity 3650 -genkey -keyalg RSA

接下来需要输入一些信息。会让输入两个密码:密钥库(keystore)密码和密钥密码。可以都输入相同的,例如:852741。后面配置kafka的时候会用到。

这里还有其他的一些信息,其中最重要的是Common Name(CN),输入主机名:kafka1;如果主机绑定了域名,则输入域名:kafka1.domain.com。

这里的kafka1是服务器的主机名,如果有多台,则按kafka2、kafka3...依次命名下去。

此时,kafka1.keystore.jks的结构如下图:

kafka1.keystore.jks结构
对于kafka集群内的其他服务端和客户端,重复上面的步骤,生成kafka2.keystore.jks、kafka3.keystore.jks、client1.keystore.jks ...

生成 CA

通常情况下,证书是要去CA申请的,但是如果仅是自家服务端和客户端(也是个服务器)用,而非面向互联网的客户端(例如浏览器),那么可以自己生成一个CA。然后让这个CA去给证书签名。

我们这里采用的是上一小节说的方式1:先自己创建证书,再自己创建CA,再让CA去签名证书。

使用openssl工具来生成一个CA:

openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650

这里会要求输入PEM密码,妥善保管这个密码,后面会继续用到。上面的命令会生成两个文件:ca-key和ca-cert。

创建 truststore.jks

truststore包含了所有可以信赖的CA。下面的脚本创建了一个truststore,它包含了上面生成的ca-cert。

keytool -keystore ca.truststore.jks -alias CARoot -import -file ca-cert

truststore保存在客户端/服务端本地:

这样,当客户端与服务端交互时,客户端只要检查服务端证书,确认该证书是由truststore中的CA颁发的,就认为服务端是可信的。
同理,当服务端与客户端交互时,服务端只要检查客户端证书,确认该证书是由truststore中的CA颁发的,就认为客户端是可信的。

如果客户端证书颁发自不同的CA,那么就要将客户端的证书,或者颁发客户端证书的CA,加入到服务端的truststore中(反之亦然)。

这里有一个信任链的关系:可以信任某一个具体的证书;也可以信任证书的颁发机构,即CA,当信任CA时,就会信任所有由该CA颁发的证书。

显然,在内部的服务端/客户端体系中,所有的服务器只要信任同一个CA,使用同一个truststore就可以了(那么所有持有由该CA签名的证书的服务器就都是可信的了)。同时,所有的服务端/客户端证书,均由此CA进行签名。

ca.truststore.jks结构

对 keystore 进行签名

第1步生成的kafka1.keystore.jks 是没有任何CA进行签名的。虽然不对其进行CA签名,也可以加入到Truststore中,以进行信任,但这样每次增减服务器,都需要更新truststore,显然不方便。更好的方式就是对其进行CA签名,然后Truststore中只要包含CA的证书就可以了。

简言之:对于需要进行SSL通信的服务器,不论是客户端还是服务端,均使用CA对其keystore进行签名。

对keystore签名的第一步,从keystore中导出未签名的证书:

keytool -keystore kafka1.keystore.jks -alias kafka1 -certreq -file cert-kafka1

接下里,使用CA对上一步导出的cert-kafka1进行签名:

openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-kafka1 -out cert-signed-kafka1 -days 3650 -CAcreateserial -passin pass:29372028

这里的pass:29372028,就是第2步里填写的PEM密码。上面的命令会生成cert-signed-kafka1文件,它是由CA签名过的证书。

将CA证书和签名过的证书导入到keystore:

keytool -keystore kafka1.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka1.keystore.jks -alias kafka1 -import -file cert-signed-kafka1
CA签名后kafka1.truststore.jks结构

最终的结构如下图所示:

最终的集群/客户端结构图

配置服务端

修改Kafka安装目录下的config/server.properties:

listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
advertised.listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093

################ SSL连接设置 ############
ssl.keystore.location=/opt/kafka_2.11-2.1.1/ssl/kafka1.keystore.jks
ssl.keystore.password=852741
ssl.key.password=852741
ssl.truststore.location=/opt/kafka_2.11-2.1.1/ssl/ca.truststore.jks
ssl.truststore.password=852741
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.client.auth=required
ssl.secure.random.implementation=SHA1PRNG

这里的几个选项都比较重要,listeners指定了Kafka将在哪些IP地址上侦听哪些端口。我们期望的情况是局域网内通过PLAINTEXT,也即明文进行传输;互联网上通过SSL,也即密文传输。所以,需要填写两组地址。然后通过防火墙控制,对于外网(互联网),只开放9093端口号(只能通过SSL连接)。

ssl.client.auth=required,这个选项则要求客户端也必须持有经过CA签名的证书(默认是客户端验证服务端,但是服务端不验证客户端)。

还有一个没有配置的选项:security.inter.broker.protocol=SSL,开启这个则broker之间进行数据交换也使用SSL,因为集群内的所有Broker都位于局域网内,所以这个选项我没有开启,这也是上面为什么listeners要同时配置PLAINTEXT和SSL的原因。

advertised.listeners这里是返回给客户端的kafka连接地址,因此,如果客户端位于互联网上的其他地区。那么这里必须是服务器的公网域名或者IP。这里我写成了kafka1,那么客户端在连接时就必须配置hosts。

进入kafka安装目录,运行服务端:

bin/kafka-server-start.sh -daemon config/server.properties

可以通过下面的命令来验证SSL连接是否运行正常:

openssl s_client -debug -connect localhost:9093 -tls1

配置客户端

这里的客户端是要部署通过互联网上,通过SSL来访问kafka集群。测试起见,我们使用的是Kafka自带的控制台客户端。

配置hosts

# 在客户端配置hosts
kafka1 220.181.38.148

因为在上面服务端的advertised.listeners配置中,我填写的是kafka1,所以需要配置hosts。如果advertised.listeners填写的是服务端的公网域名或者IP,则不需要配置。

创建client-ssl.properties

在Kafka安装目录下的config目录下创建client-ssl.properties文件,内容如下:

security.protocol=SSL
ssl.keystore.location=/opt/kafka_2.11-2.1.1/ssl/client.keystore.jks
ssl.keystore.password=852741
ssl.truststore.location=/opt/kafka_2.11-2.1.1/ssl/ca.truststore.jks
ssl.truststore.password=852741
ssl.key.password=852741
这里的client.keystore.jks和上面服务端的生成方式一模一样,并且使用相同的CA。

client.keystore.jsk生成的脚本如下:

keytool -keystore client.keystore.jks -alias client -validity 3650 -genkey -keyalg RSA; 
keytool -keystore client.keystore.jks -alias client -certreq -file cert-client;
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-client -out cert-signed-client -days 3650 -CAcreateserial -passin pass:29372028;
keytool -keystore client.keystore.jks -alias CARoot -import -file ca-cert;
keytool -keystore client.keystore.jks -alias client -import -file cert-signed-client;

测试运行

全部配置好了以后,在客户端执行下面的命令,创建一个Topic:test-20190708,并写入几行数据:

$ bin/kafka-console-producer.sh --broker-list kafka1:9093 --topic test-20190708 --producer.config config/client-ssl.properties
>5
>6
>7
>8

使用下面的命令从Topic中读取数据:

$ bin/kafka-console-consumer.sh --bootstrap-server kafka1:9093 --topic test-20190708 --consumer.config config/client-ssl.properties --from-beginning
2
4
1
3

如果将上面命令中的config/client-ssl.properties取消掉,则会报错,这里就不再演示了。

总结

这篇文章简要介绍了如何配置kafka的SSL连接,先讲了如何生成密钥和证书文件,接下来分别在服务端和客户端进行了证书配置,最后,使用控制台程序演示了使用ssl进行连接。

更多的参考资料可以查看官网资料:

感谢阅读,希望这篇文章能给你带来帮助!