Kafka分布式消息系统(通过控制台访问) - Part.4
在前面的三篇文章中,介绍了Kafka的相关概念,并且搭建了一个3节点的Kafka和Zookeeper集群。接下来,就可以对集群进行访问。访问的方式通常为各种编程语言的Kafka库,或者是使用命令行脚本。各种语言的编程库,可以在这个页面获得:https://cwiki.apache.org/confluence/display/KAFKA/Clients。这篇文章主要讲述如何通过命令行来访问Kafka集群,包括查看所有Topic(主题)、创建Topic、查看Topic详情、写入数据到Topic、读取Topic内容,以及删除Topic。
本文假设你已经搭建好了kafka集群和zookeeper集群,主机名分别为kafka1、kafka2、kafka3(zookeeper1、zookeeper2、zookeeper3)。只有一台主机也是可以的,其名称为kafka1/zookeeper1。
bin目录脚本
建议将kafka的默认目录设置为环境变量:KAFKA_HOME,以后再访问时方便一些,只要 cd $KAFKA_HOME就可以了(实际安装目录可能为/opt/kafka_2.11-1.1.0)。本文如无特别说明,所有的操作都是在$KAFKA_HOME文件夹下执行的。
在bin目录下包含了很多默认脚本。除了前面接触过的,启动和停止kafka、zookeeper集群的 kafka-server-start.sh、kafka-server-stop.sh、zookeeper-server-start.sh、zookeeper-server-stop.sh,还有其他二十几个。如果想要了解每个脚本的用途和用法,只需要不带参数执行一下就可以了,例如执行 bin/kafka-console-producer.sh:
# bin/kafka-console-producer.sh
Read data from standard input and publish it to Kafka. Option Description ------ ----------- --batch-size <Integer: size> Number of messages to send in a single batch if they are not being sent synchronously. (default: 200) --broker-list <String: broker-list> REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2. ... 以下省略100行
上面的第一句话描述了该脚本的用途:Read data from standard input and publish it to Kafka.(从标准输入读取数据并发布到Kafka中)。下面则列出了必须和可选的参数。接下来,我们先来看下如何创建一个topic。
创建topic
使用kafka-topics.sh可以创建一个名为 test.user 的Topic:
# bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --create --topic test.user --replication-factor 2 --partitions 6
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both. Created topic "test.user".
主要的几个参数含义如下:
- --zookeeper:zookeeper的位置,kafka使用zookeeper来记录Broker和Topic的信息,因此当操作Topic时,需要指定zookeeper的位置。后面的/kafka是保存kafka相关信息的文件夹路径。zookeeper是以树状的结构来保存信息的。
- --replication-factor:副本数,默认值和kafka的配置相关,这里我设置为2。
- --partitions:分区数,因为有3节点,一般设置为节点数x2,因此这里设置为6。
这里有一个警告,意思是Topic的名称要么使用'.',要么使用'_',但是不要两个都用,否则可能出现名称冲突问题。因为本处只使用了'.',因此可以不用理会。
查看topic详情
kafka-topics.sh 同样可以用来查看刚刚创建的topic:
# bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --topic test.user --describe
Topic:test.user PartitionCount:6 ReplicationFactor:2 Configs: Topic: test.user Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1 Topic: test.user Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: test.user Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test.user Partition: 3 Leader: 3 Replicas: 3,2 Isr: 3,2 Topic: test.user Partition: 4 Leader: 1 Replicas: 1,3 Isr: 1,3 Topic: test.user Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1
可以看到test.user的主要信息都罗列了出来,包括分区数、副本数、Leader等。
查看topic列表
使用下面的命令,可以查看到所有的主题列表:
# bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --list
__consumer_offsets cdc.tgstat_ddztest.test_spark connect-configs-aw connect-offsets-aw connect-status-aw ... 省略几十行 test.user
这里面会看到一些特殊的topic:__consumer_offsets,可以称作“系统topic”,用来记录Consumer提交的offsets,可见kafka也利用自身来记录一些系统数据。
connect-configs-aw、connect-offsets-aw、connect-status-aw是kafka connect组件用到的3个topic,也是“系统topic”,分别记录配置、偏移量和状态。
在最后一行,看到了我们前面创建的test.user主题。在我的主机上,还有几十个其他的topic,这里进行了省略。如果你是在一个新的集群上做测试,那么应该只有__consumer_offsets和test.user两个topic。
读/写 topic
创建好了topic之后,就可以对topic进行读写,此时需要同时打开两个控制台,一个用于写,一个用于读。
控制台(写)的命令如下:
# bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic test.user
>
此时会进入交互模式,光标位于“>”右侧,可以输入文本,按下回车后,输入的内容将会发布到kafka集群的test.user主题当中。
控制台(读)的命令如下:
# bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test.user --from-beginning
此时,如果topic中包含有数据,会将数据输出到控制台,直到队列的末尾,然后等待新数据;如果没有数据(我们当前的情况),那么就会直接等待新数据。
现在在控制台(写)当中输入文本,可以在控制台(读)中立即看到。如下图所示:
值得注意的是上面的次序,因为1~6这几个数我输入的非常快,又因为有6个partition,因此这6条消息会发送到不同的partition当中,同一个partition中的消息是有序的,不同partition中的消息是无序的。因此,可以看到消息的顺序在写入端和读取端是不一致的。
删除topic
使用下面的命令,删除topic:
# bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --delete --topic test.user
Topic test.user is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
上面的提示是说:只有当delete.topic.enable开启时,删除topic才有效。而这个delete.topic.enable选项,是在搭建kafka集群时设置的,这里可以参看前面的文章。
删除完成后,再使用前面的命令查看该topic,会看到控制台返回空,说明没有test.user这个topic了。
至此,我们就完成了对topic的最常见操作:创建、删除、查看,以及读取和写入数据。使用控制台的方式进行操作,可以对集群的状态进行快速的验证,非常的方便。在实际生产中,还是通过各种语言的kafka库来进行访问。对集群进行查看,除了使用控制台,也可以通过第三方的一些UI工具,例如Yahoo的Kafka-Manager、Landoop的Kafka-topics-ui等。这个以后再介绍。
感谢阅读,希望这篇文章能给你带来帮助!