Kafka分布式消息系统(SSL客户端) - Part.7
在上一篇 Kafka分布式消息系统(SSL安全连接) 中,我们使用了kafka自带的控制台客户端进行了测试,使用SSL进行连接,并发送/接收了数据。在实际项目中,我们通常会使用熟悉的编程语言来开发客户端。kafka官方提供了多种语言的客户端,包括Go、C#、Java、等。具体的地址在这里:
- https://cwiki.apache.org/confluence/display/KAFKA/Clients
- https://docs.confluent.io/current/clients/index.html
这篇文章,将使用C#语言来开发一个Kafka的客户端,并通过SSL连接Kafka服务器,发送并接收数据。使用的类库以及库的说明文档分别是:
- https://github.com/confluentinc/confluent-kafka-dotnet
- https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.html
密钥库格式转换
在上一篇文章中,使用Java自带的keytool生成了client.keystore.jks。它的格式为JKS,但不同的客户端类库可能采用的是不同的密钥库格式。例如,本文的C#客户端就只支持另一种常见的密钥库格式:PKCS12。
在服务端可以使用ssl.keystore.type配置密钥库格式,默认是JKS。但是客户端类库可能并不支持指定格式,并且只支持某一种格式,比如说本文的C#客户端。
使用keytool工具来对密钥格式进行转换:
keytool -importkeystore -srckeystore client.keystore.jks -srcstoretype JKS -deststoretype PKCS12 -destkeystore client.keystore.p12
这样会生成一个PKCS12格式的密钥库,文件名是:client.keystore.p12。
编写C#客户端代码
接下来编写客户端代码就是非常简单的了,主要是配置SSL连接的参数:密钥库、密钥库密码、密钥密码、证书等。这些配置和使用命令行客户端时基本类同,唯一的差别是:C#客户端不使用truststore,而是直接使用CA证书(命令行客户端将CA证书导入到了truststore中)。
直接上代码:
using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; namespace kafka.client { class Program { static void Main (string[] args) { //RunProducer (); RunConsumer(); } static ClientConfig GetConfig () { var sslroot = "/Users/zhangzy/code/dotnet/kafka客户端/kafka.client/ssl"; return new ClientConfig { BootstrapServers = "kafka1:9093", SecurityProtocol = SecurityProtocol.Ssl, SslKeystoreLocation = sslroot + "/client.keystore.p12", SslKeystorePassword = "852741", SslKeyPassword = "852741", SslCaLocation = sslroot + "/ca-cert" }; } static void RunProducer () { var config = GetConfig (); using (var p = new ProducerBuilder<Null, string> (config).Build ()) { try { var dr = p.ProduceAsync ("test-topic", new Message<Null, string> { Value = "test3" }).Result; Console.WriteLine ($"Delivered '{dr.Value}' to '{dr.Topic}/{dr.Partition}/{dr.Offset}'"); } catch (ProduceException<Null, string> e) { Console.WriteLine ($"Delivery failed: {e.Error.Reason}"); } } } static void RunConsumer () { var clientConfig = GetConfig (); var config = new ConsumerConfig (clientConfig); config.GroupId = "test-consumer-1"; config.AutoOffsetReset = AutoOffsetReset.Earliest; using (var c = new ConsumerBuilder<Ignore, string> (config).Build ()) { c.Subscribe ("test-topic"); CancellationTokenSource cts = new CancellationTokenSource (); Console.CancelKeyPress += (_, e) => { e.Cancel = true; // prevent the process from terminating. cts.Cancel (); }; try { while (true) { try { var cr = c.Consume (cts.Token); Console.WriteLine ($"Consumed message '{cr.Value}' at: '{cr.Topic}/{cr.Partition}/{cr.Offset}'."); } catch (ConsumeException e) { Console.WriteLine ($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { // Ensure the consumer leaves the group cleanly and final offsets are committed. c.Close (); } } } } }
一切顺利的话,应当可以看到类似下面的输出:
RunProducer():
dotnet run Delivered 'test3' to 'test-topic/[0]/5'
RunConsumer():
dotnet run Consumed message 'test3' at: 'test-topic/[0]/5'.
安装 librdkafka
Kafka的.Net包实际上是基于librdkafka包的一个轻量级包装,如果上面的代码运行出错,并且错误代码中含有librdkafka关键字,很可能就是没有安装这个包。
可以通过源码安装librdkafka,源码地址是:https://github.com/edenhill/librdkafka
也可以通过yum工具安装,命令如下:
rpm --import https://packages.confluent.io/rpm/5.1/archive.key cp confluent.repo /etc/yum.repos.d yum clean all yum install -y librdkafka-devel
其中confluent.repo文件内容如下:
[Confluent.dist] name=Confluent repository (dist) baseurl=https://packages.confluent.io/rpm/5.1/7 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.1/archive.key enabled=1 [Confluent] name=Confluent repository baseurl=https://packages.confluent.io/rpm/5.1 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.1/archive.key enabled=1
至此,就完成了Kafka客户端的SSL连接和测试。
感谢阅读,希望这篇文章能给你带来帮助!