张子阳的博客

首页 读书 技术 店铺 关于
张子阳的博客 首页 读书 技术 店铺 关于

Kafka分布式消息系统(SSL客户端) - Part.7

2019-7-9 作者: 张子阳 分类: 分布式系统

在上一篇 Kafka分布式消息系统(SSL安全连接) 中,我们使用了kafka自带的控制台客户端进行了测试,使用SSL进行连接,并发送/接收了数据。在实际项目中,我们通常会使用熟悉的编程语言来开发客户端。kafka官方提供了多种语言的客户端,包括Go、C#、Java、等。具体的地址在这里:

这篇文章,将使用C#语言来开发一个Kafka的客户端,并通过SSL连接Kafka服务器,发送并接收数据。使用的类库以及库的说明文档分别是:

密钥库格式转换

在上一篇文章中,使用Java自带的keytool生成了client.keystore.jks。它的格式为JKS,但不同的客户端类库可能采用的是不同的密钥库格式。例如,本文的C#客户端就只支持另一种常见的密钥库格式:PKCS12。

在服务端可以使用ssl.keystore.type配置密钥库格式,默认是JKS。但是客户端类库可能并不支持指定格式,并且只支持某一种格式,比如说本文的C#客户端。

使用keytool工具来对密钥格式进行转换:

keytool -importkeystore -srckeystore client.keystore.jks -srcstoretype JKS -deststoretype PKCS12 -destkeystore client.keystore.p12

这样会生成一个PKCS12格式的密钥库,文件名是:client.keystore.p12。

编写C#客户端代码

接下来编写客户端代码就是非常简单的了,主要是配置SSL连接的参数:密钥库、密钥库密码、密钥密码、证书等。这些配置和使用命令行客户端时基本类同,唯一的差别是:C#客户端不使用truststore,而是直接使用CA证书(命令行客户端将CA证书导入到了truststore中)。

直接上代码:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;

namespace kafka.client {
    class Program {

        static void Main (string[] args) {
            //RunProducer ();
            RunConsumer();
        }

        static ClientConfig GetConfig () {
            var sslroot = "/Users/zhangzy/code/dotnet/kafka客户端/kafka.client/ssl";

            return new ClientConfig {
                BootstrapServers = "kafka1:9093",
                    SecurityProtocol = SecurityProtocol.Ssl,
                    SslKeystoreLocation = sslroot + "/client.keystore.p12",
                    SslKeystorePassword = "852741",
                    SslKeyPassword = "852741",
                    SslCaLocation = sslroot + "/ca-cert"
            };
        }

        static void RunProducer () {
            var config = GetConfig ();

            using (var p = new ProducerBuilder<Null, string> (config).Build ()) {
                try {
                    var dr = p.ProduceAsync ("test-topic", new Message<Null, string> { Value = "test3" }).Result;
                    Console.WriteLine ($"Delivered '{dr.Value}' to '{dr.Topic}/{dr.Partition}/{dr.Offset}'");
                } catch (ProduceException<Null, string> e) {
                    Console.WriteLine ($"Delivery failed: {e.Error.Reason}");
                }
            }
        }

        static void RunConsumer () {
            var clientConfig = GetConfig ();
            var config = new ConsumerConfig (clientConfig);
            config.GroupId = "test-consumer-1";
            config.AutoOffsetReset = AutoOffsetReset.Earliest;

            using (var c = new ConsumerBuilder<Ignore, string> (config).Build ()) {
                c.Subscribe ("test-topic");

                CancellationTokenSource cts = new CancellationTokenSource ();
                Console.CancelKeyPress += (_, e) => {
                    e.Cancel = true; // prevent the process from terminating.
                    cts.Cancel ();
                };

                try {
                    while (true) {
                        try {
                            var cr = c.Consume (cts.Token);
                            Console.WriteLine ($"Consumed message '{cr.Value}' at: '{cr.Topic}/{cr.Partition}/{cr.Offset}'.");
                        } catch (ConsumeException e) {
                            Console.WriteLine ($"Error occured: {e.Error.Reason}");
                        }
                    }
                } catch (OperationCanceledException) {
                    // Ensure the consumer leaves the group cleanly and final offsets are committed.
                    c.Close ();
                }
            }
        }
    }
}

一切顺利的话,应当可以看到类似下面的输出:

RunProducer():

dotnet run
Delivered 'test3' to 'test-topic/[0]/5'

RunConsumer():

dotnet run
Consumed message 'test3' at: 'test-topic/[0]/5'.

安装 librdkafka

Kafka的.Net包实际上是基于librdkafka包的一个轻量级包装,如果上面的代码运行出错,并且错误代码中含有librdkafka关键字,很可能就是没有安装这个包。

可以通过源码安装librdkafka,源码地址是:https://github.com/edenhill/librdkafka

也可以通过yum工具安装,命令如下:

rpm --import https://packages.confluent.io/rpm/5.1/archive.key
cp confluent.repo /etc/yum.repos.d
yum clean all
yum install -y librdkafka-devel

其中confluent.repo文件内容如下:

[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.1/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.1
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1

至此,就完成了Kafka客户端的SSL连接和测试。

感谢阅读,希望这篇文章能给你带来帮助!