张子阳的博客

首页 读书 技术 店铺 关于
张子阳的博客 首页 读书 技术 关于

Kafka分布式消息系统(搭建Kafka集群) - Part.3

2018-06-26 张子阳 分类: 分布式系统

在前面两篇文章中,我们了解了基本概念,也安装、配置好了zookeeper集群,在这篇文章中,我们将一步步搭建kafka集群。

《搭建zookeeper集群》 中,我们已经配置了系统环境,并且下载、解压了kafka安装包,所以这篇文章更多的是配置。kafka和zookeeper一样,都是使用java开发的,之前java环境已经安装过了,所以这篇文章也不再演示。如果你还没有看过上一篇,那么建议先看 上一篇

配置Kafka

配置文件打开数目

kafka为了提高吞吐量,会进行打开大量文件进行读写操作,需要配置系统支持更多的文件打开数量:

# echo "* hard nofile 100000 * soft nofile 100000" | tee --append /etc/security/limits.conf

创建数据文件夹

类似地,我们创建/data/kafka文件夹,用于存储kafka流数据。

# mkdir -p /data/kafka; \ chown -R root:root /data/kafka;

为了更好的性能,这个文件夹应该挂载的独立的磁盘上。

配置config/server.properties

和zookeeper类似,kafka的配置也位于 config 文件夹中,使用cat命令可以查看默认的配置:

如无特殊说明,后续操作均位于kafka的安装目录:/opt/kafka/kafka_2.11-1.1.0

# cd /opt/kafka/kafka_2.11-1.1.0; \ cat config/server.properties

输出如下(做了简化):

############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from #listeners=PLAINTEXT://:9092 # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 # The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Retention Policy ############################# log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000 ############################# Group Coordinator Settings ############################# group.initial.rebalance.delay.ms=0

先删除旧的配置文件,rm -rf config/server.properties,然后使用vim编辑器,新建一个,并输入以下内容:

################### 基础设置 ##################### # 服务器Id,必须为唯一数字。三台服务器分别设置为1、2、3 broker.id=1 # 这里非常重要,注意kafka1是在上篇文章中讲DNS时配置的。kafka的客户端也要配置相同的DNS,否则这里就要改为IP advertised.listeners=PLAINTEXT://kafka1:9092 # 允许删除主题,默认是false delete.topic.enable=true ################### 日志设置 ###################### # 设置保存日志的位置 log.dirs=/data/kafka # 默认的Partition数量,建议为台数x2,我们是3台,因此这里填6;如果只有1台,则填2 num.partitions=6 # 集群情况下,这里建议填2,单台填1,填的越多集群速度越慢(需要同步数据),但是容错性越高 default.replication.factor=2 ################# 配置数量 ###################### #集群情况下,以下都填2。如果只有1台,则填1 offsets.topic.replication.factor=2 transaction.state.log.replication.factor=2 transaction.state.log.min.isr=2 # 单台填1,集群填2。如果单台填2,则无法启动kafka min.insync.replicas=2 ################ 日志保存策略 ############### # 日志过期时间,168小时,也就是一周 log.retention.hours=168 # 最大segment的尺寸,1GB log.segment.bytes=1073741824 # log检查间隔时间 log.retention.check.interval.ms=300000 #################### Zookeeper ################# # zookeeper连接,注意后面带一个/kafka,这样就不会占用 zookeeper的根目录 # 注意这里zookeeper1、zookeeper2、zookeeper3 也是上一节DNS中配置过的 zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka # 连接zookeeper的超时时间 zookeeper.connection.timeout.ms=6000 ################## 其他 ################# # 自动创建主题 auto.create.topics.enable=true # 消息大小(默认是1MB,这里改成100MB,相当于不限制消息大小) replica.fetch.max.bytes=104857600 message.max.bytes=104857600

在kafka中,持久化的数据不叫数据,叫日志。和我们传统概念上理解的日志不同,需要注意一下,否则会被搞晕。上面的配置的日志部分,其实就是数据部分。

启动kafka

配置完成后,和zookeeper类似,我们通过bin目录下的脚本进行启动:

以前台进程的模式启动:

# bin/kafka-server-start.sh config/server.properties

以后台服务的模式启动:

# bin/kafka-server-start.sh -daemon config/server.properties

启动后又会看到两屏的日志信息,注意寻找Error,如果只有Info,可以初步认为已经成功运行了:

以上省略100行... INFO KafkaConfig values: advertised.host.name = null advertised.listeners = null advertised.port = null alter.config.policy.class.name = null alter.log.dirs.replication.quota.window.num = 11 alter.log.dirs.replication.quota.window.size.seconds = 1 authorizer.class.name = auto.create.topics.enable = true auto.leader.rebalance.enable = true background.threads = 10 broker.id = 0 broker.id.generation.enable = true broker.rack = null compression.type = producer connections.max.idle.ms = 600000 controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 30000 create.topic.policy.class.name = null default.replication.factor = 1 delegation.token.expiry.check.interval.ms = 3600000 delegation.token.expiry.time.ms = 86400000 delegation.token.master.key = null delegation.token.max.lifetime.ms = 604800000 delete.records.purgatory.purge.interval.requests = 1 以下省略200行...

查看kafka启动日志

运行起来后,其中有一段很整齐的输出,如上所示,为kafka的配置。

和zookeeper一样,当使用服务进程的方式后台运行,可以通过以下命令查看启动日志:

# cat logs/server.log

验证安装

可以使用两种方式来验证kafka的安装,一种是使用linux的系统命令:

# ps aux | grep kafka.Kafka

一种是使用nc工具,在三台服务器上分别验证各自是否运行正常:

# nc -vz kafka1 9092 # nc -vz kafka2 9092 # nc -vz kafka3 9092

当正常时,会返回类似下面这样的结果:

Ncat: Version 7.50 ( https://nmap.org/ncat ) Ncat: Connected to 192.168.0.31:9092. Ncat: 0 bytes sent, 0 bytes received in 0.02 seconds.

连接失败时,则返回connection refused.

停止kafka

和zookeeper一样简单,没有参数,一条语句即可:

# bin/kafka-server-stop.sh

至此,kafka单台的配置就完成了。需要在其余两台机器上重复这些操作,注意的事,当三台都配置好之后,务必修改config/server.properties,然后重新启动kafka。修改的方式已经写在上面的注释中了。

感谢阅读,希望这篇文章能给你带来帮助!