找回密码
 注册
查看: 410|回复: 1

Kafka 快速搭建

[复制链接]

1

主题

0

回帖

12

积分

管理员

积分
12
QQ
发表于 2022-12-30 09:08:01 | 显示全部楼层 |阅读模式
在了解完 Kafka 的基本概念之后,我们通过搭建 Kafka 集群来进一步深刻认识一下 Kafka。
确保安装环境安装 Java 环境
在安装 Kafka 之前,先确保Linux 环境上是否有 Java 环境,使用 java -version 命令查看 Java 版本,推荐使用Jdk 1.8 ,如果没有安装 Java 环境的话,可以按照这篇文章进行安装(https://www.cnblogs.com/zs-notes/p/8535275.html
安装 Zookeeper 环境
Kafka 的底层使用 Zookeeper 储存元数据,确保一致性,所以安装 Kafka 前需要先安装 Zookeeper,Kafka 的发行版自带了 Zookeeper ,可以直接使用脚本来启动,不过安装一个 Zookeeper 也不费劲
Zookeeper 单机搭建
Zookeeper 单机搭建比较简单,直接从 https://www.apache.org/dyn/closer.cgi/zookeeper/ 官网下载一个稳定版本的 Zookeeper ,这里我使用的是 3.4.10,下载完成后,在 Linux 系统中的 /usr/local 目录下创建 zookeeper 文件夹,使用xftp 工具(xftp 和 xshell 工具都可以在官网 https://www.netsarang.com/zh/xshell/ 申请免费的家庭版)把下载好的 zookeeper 压缩包放到 /usr/local/zookeeper 目录下。
如果下载的是一个 tar.gz 包的话,直接使用 tar -zxvf zookeeper-3.4.10.tar.gz解压即可
如果下载的是 zip 包的话,还要检查一下 Linux 中是否有 unzip 工具,如果没有的话,使用 yum install unzip 安装 zip 解压工具,完成后使用 unzip zookeeper-3.4.10.zip 解压即可。
解压完成后,cd 到 /usr/local/zookeeper/zookeeper-3.4.10 ,创建一个 data 文件夹,然后进入到 conf 文件夹下,使用 mv zoo_sample.cfg zoo.cfg 进行重命名操作
然后使用 vi 打开 zoo.cfg ,更改一下dataDir = /usr/local/zookeeper/zookeeper-3.4.10/data ,保存。
进入bin目录,启动服务输入命令 ./zkServer.sh start 输出下面内容表示搭建成功
关闭服务输入命令,./zkServer.sh stop
使用 ./zkServer.sh status 可以查看状态信息。
Zookeeper 集群搭建准备条件
准备条件:需要三个服务器,这里我使用了CentOS7 并安装了三个虚拟机,并为各自的虚拟机分配了1GB的内存,在每个 /usr/local/ 下面新建 zookeeper 文件夹,把 zookeeper 的压缩包挪过来,解压,完成后会有 zookeeper-3.4.10 文件夹,进入到文件夹,新建两个文件夹,分别是 data 和 log 文件夹
注:上一节单机搭建中已经创建了一个data 文件夹,就不需要重新创建了,直接新建一个 log 文件夹,对另外两个新增的服务需要新建这两个文件夹。
设置集群
新建完成后,需要编辑 conf/zoo.cfg 文件,三个文件的内容如下
1
, `& @: N, a6 U. Y- [2& ]$ g* m( w; V8 u
3
2 `( W$ A8 F! v9 ?3 Q7 |" H4
* w. z7 b3 d0 O1 X; |- }4 j' n: z. ?5
6 e4 u3 m! E4 I3 r/ v8 X; W6( \! N4 V; i8 R' @9 H6 B% R3 D$ B
7
1 \7 t  f& w3 r' {& p2 r# g8/ O! ~) L5 M( e1 x1 s9 K) Y' e
9
tickTime=2000initLimit=10syncLimit=5dataDir=/usr/local/zookeeper/zookeeper-3.4.10/datadataLogDir=/usr/local/zookeeper/zookeeper-3.4.10/logclientPort=12181server.1=192.168.1.7:12888:13888server.2=192.168.1.8:12888:13888server.3=192.168.1.9:12888:13888

; i3 |9 D  u3 K2 ?
, Y/ e- ^, q1 Q
server.1 中的这个 1 表示的是服务器的标识也可以是其他数字,表示这是第几号服务器,这个标识要和下面我们配置的 myid 的标识一致可以。
192.168.1.7:12888:13888 为集群中的 ip 地址,第一个端口表示的是 master 与 slave 之间的通信接口,默认是 2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口,默认是 3888
现在对上面的配置文件进行解释
tickTime: 这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
syncLimit: 这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒
dataDir: 快照日志的存储路径
dataLogDir: 事务日志的存储路径,如果不配置这个那么事务日志会默认存储到dataDir指定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事务日志、快照日志太多
clientPort: 这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
创建 myid 文件
在了解完其配置文件后,现在来创建每个集群节点的 myid ,我们上面说过,这个 myid 就是 server.1 的这个 1 ,类似的,需要为集群中的每个服务都指定标识,使用 echo 命令进行创建
1
1 G! }. N) ^3 C( @2 H2& I# W  U0 p0 H& |+ J% D- M- L- ~2 v
3
' A: F' r/ w. z- d4 r# f' K" S  t4* h2 l2 X9 [4 ^; t+ q4 f3 C: g
5
/ W; y" O! I7 i; x6
# server.1echo "1" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid# server.2echo "2" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid# server.3echo "3" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid

  Q# j. `0 F9 f: |
5 G' A0 y0 v; D* V* H  V: Q% y0 f
启动服务并测试
配置完成,为每个 zk 服务启动并测试,我在 windows 电脑的测试结果如下
启动服务(每台都需要执行)
1
% B6 g+ b1 Y4 H( S& A$ F: }2
cd /usr/local/zookeeper/zookeeper-3.4.10/bin./zkServer.sh start
6 y- I0 H8 g) q
3 W& |. ?: l1 p; F% f1 o
检查服务状态
使用 ./zkServer.sh status 命令检查服务状态
192.168.1.7 — follower
192.168.1.8 — leader
192.168.1.9 — follower
zk集群一般只有一个leader,多个follower,主一般是相应客户端的读写请求,而从主同步数据,当主挂掉之后就会从follower里投票选举一个leader出来。
Kafka 集群搭建准备条件
在 /usr/local 下新建 kafka 文件夹,然后把下载完成的 tar.gz 包移到 /usr/local/kafka 目录下,使用 tar -zxvf 压缩包 进行解压,解压完成后,进入到 kafka_2.12-2.3.0 目录下,新建 log 文件夹,进入到 config 目录下
我们可以看到有很多 properties 配置文件,这里主要关注 server.properties 这个文件即可。
kafka 启动方式有两种,一种是使用 kafka 自带的 zookeeper 配置文件来启动(可以按照官网来进行启动,并使用单个服务多个节点来模拟集群http://kafka.apache.org/quickstart#quickstart_multibroker),一种是通过使用独立的zk集群来启动,这里推荐使用第二种方式,使用 zk 集群来启动
修改配置项
需要为每个服务都修改一下配置项,也就是server.properties, 需要更新和添加的内容有
1
! ]" s6 |: @0 H& m4 i20 A# X* @  W$ g- U' t& W$ F
3
) I0 X: q3 _. Q- L  ?, b& K4
$ `3 j! g) q8 ^; j% I" V57 ~, k+ S6 o, G3 K% u) E0 F
6& c/ U& X9 F2 `' l3 w
7
2 b- Z5 D# k/ q# J8* s# C1 p, A$ a4 J  L. p0 j
9* y" d# \0 Z" h
10
broker.id=0 //初始是0,每个 server 的broker.id 都应该设置为不一样的,就和 myid 一样 我的三个服务分别设置的是 1,2,3log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log#在log.retention.hours=168 下面新增下面三项message.max.byte=5242880default.replication.factor=2replica.fetch.max.bytes=5242880#设置zookeeper的连接端口zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181

% V% U" B: V' x2 x& ~8 p( c

. v/ l0 h6 t& T6 H; T+ P4 P
配置项的含义
1) x' }8 T, U3 Z& h1 P
2. @# h) T/ f% y
3
; Y0 g/ X9 H9 K- u( K1 x7 H% t0 S4
* w! j% C/ `5 O9 M; A5& y. h- b5 D7 K- |
6% |$ R: Y3 }+ I# ^/ _' ~: R
7
9 K0 k% n! Q. v- i$ b3 w87 i' R3 f" K! O& X" @
99 C, u3 a/ q, A5 A+ F
10
$ B; k) B; M1 |$ T  q' ~" q3 t& E11
' C/ j0 I' V0 P! T5 a9 l& E121 b& S1 O; i9 i+ a( _% {
13
+ j2 r& e- k* t- i3 y14
6 [3 @' @& I1 I2 }15) G7 k! B7 E( T# C& u0 @
16
3 H: d( e; }' J- v" m. k17( u, C& _5 @# {( w  _; t
18
broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样port=9092 #当前kafka对外提供服务的端口默认是9092host.name=192.168.1.7 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。num.network.threads=3 #这个是borker进行网络处理的线程数num.io.threads=8 #这个是borker进行I/O处理的线程数log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小num.partitions=1 #默认的分区数,一个topic默认1个分区数log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天message.max.byte=5242880  #消息保存的最大值5Mdefault.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务replica.fetch.max.bytes=5242880  #取消息的最大直接数log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 #设置zookeeper的连接端口
, u; `; v/ P& [- a+ [
9 }& L/ W7 a6 A7 L8 a) s
启动 Kafka 集群并测试
  • 启动服务,进入到 /usr/local/kafka/kafka_2.12-2.3.0/bin 目录下
    ' j/ j: e! E4 R3 {8 G$ Y; k& u6 E5 j
1  w* l2 Z/ X3 E1 ~& ^2 G0 x6 k2 n
2
# 启动后台进程./kafka-server-start.sh -daemon ../config/server.properties
8 f. H+ O- M* i8 H4 Z+ Q

3 k4 b+ P( s1 I& W; i: T
  • 检查服务是否启动, O; c7 S. u6 v- X0 R
1
  V7 v8 a. S3 A" g/ _2+ F) }3 z( y! X
30 F) e. I7 T$ G/ M) D: f
4
# 执行命令 jps6201 QuorumPeerMain7035 Jps6972 Kafka

. _" Q% z! K+ K" `+ R' `1 B% P, u

1 f5 \) @) F8 F# m/ m0 c
  • kafka 已经启动
  • 创建 Topic 来验证是否创建成功
    0 h2 W) L* V4 u# {4 @( O
1
/ X: w+ B$ h8 s1 d* b! q2
# cd .. 往回退一层 到 /usr/local/kafka/kafka_2.12-2.3.0 目录下bin/kafka-topics.sh --create --zookeeper 192.168.1.7:2181 --replication-factor 2 --partitions 1 --topic cxuan
% y. W" |1 [9 n- A" z1 f2 }
# L( p! \. R5 X3 s' n: J
对上面的解释
–replication-factor 2 复制两份
–partitions 1 创建1个分区
–topic 创建主题
查看我们的主题是否出创建成功
1
bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181
+ {5 u4 }; U6 w- M% [

, }$ y0 x. f5 a7 Z, K
启动一个服务就能把集群启动起来
在一台机器上创建一个发布者
1# C* j7 m+ Z; z* Q4 a1 j
2
# 创建一个broker,发布者./kafka-console-producer.sh --broker-list 192.168.1.7:9092 --topic cxuantopic

- a$ C/ e& v' t7 r/ m+ j
+ X$ [! x5 L9 U+ c
在一台服务器上创建一个订阅者
1# n/ ~# V2 N  p
2
# 创建一个consumer, 消费者bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning
) S9 k7 c8 F2 s/ s1 q

3 c/ B* P+ e$ P/ A! \% u
注意:这里使用 –zookeeper 的话可能出现 zookeeper is not a recognized option 的错误,这是因为 kafka 版本太高,需要使用 --bootstrap-server 指令
测试结果
发布
消费
其他命令
显示 topic
1
4 A1 T" V$ h& i  N5 q* F2
) g% F5 ]( S3 L& N/ k2 B3" q: m" V* W2 x( a8 v
4
bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181# 显示cxuantopic
* J  K; U1 o! ]. S9 d! l/ A! @+ k- k' T

- R% f1 @$ k8 J- \0 ~" y
查看 topic 状态
1" x3 @$ |, K2 ~1 o/ C4 |. t0 d
22 ~$ }2 M# S* N
3) `9 M1 V$ I& q, L2 @
4/ {) K1 S( J  f- m% u: R. }
55 H+ d% j1 l7 u2 ^  `
6
5 Q$ j- @6 |% Y9 q7
' A3 ?+ h" |& w4 n: {% O5 J  ]; z8
bin/kafka-topics.sh --describe --zookeeper 192.168.1.7:2181 --topic cxuantopic# 下面是显示的详细信息Topic:cxuantopic PartitionCount:1 ReplicationFactor:2 Configs:Topic: cxuantopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2# 分区为为1  复制因子为2   主题 cxuantopic 的分区为0 # Replicas: 0,1   复制的为1,2
; K$ j- c, V/ L% c2 [
3 V# @1 j+ }$ t
Leader 负责给定分区的所有读取和写入的节点,每个节点都会通过随机选择成为 leader。
Replicas 是为该分区复制日志的节点列表,无论它们是 Leader 还是当前处于活动状态。
Isr 是同步副本的集合。它是副本列表的子集,当前仍处于活动状态并追随Leader。
至此,kafka 集群搭建完毕。
验证多节点接收数据
刚刚我们都使用的是 相同的ip 服务,下面使用其他集群中的节点,验证是否能够接受到服务
在另外两个节点上使用
1
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning

( w: B; P! o5 S

2 V! S  b# b! I3 ^2 ]+ A7 d8 u* j
然后再使用 broker 进行消息发送,经测试三个节点都可以接受到消息。
配置详解
在搭建 Kafka 的时候我们简单介绍了一下 server.properties 中配置的含义,现在我们来详细介绍一下参数的配置和概念
常规配置
这些参数是 kafka 中最基本的配置
  • broker.id+ A% R8 Q$ C6 B: I+ b. o7 l- O
每个 broker 都需要有一个标识符,使用 broker.id 来表示。它的默认值是 0,它可以被设置成其他任意整数,在集群中需要保证每个节点的 broker.id 都是唯一的。
  • port$ x$ d) H7 A8 c0 G$ u9 e. I
如果使用配置样本来启动 kafka ,它会监听 9092 端口,修改 port 配置参数可以把它设置成其他任意可用的端口。
  • zookeeper.connect6 G) w& Z! [  y
用于保存 broker 元数据的地址是通过 zookeeper.connect 来指定。 localhost:2181 表示运行在本地 2181 端口。该配置参数是用逗号分隔的一组 hostname:port/path 列表,每一部分含义如下:
hostname 是 zookeeper 服务器的服务名或 IP 地址
port 是 zookeeper 连接的端口
/path 是可选的 zookeeper 路径,作为 Kafka 集群的 chroot 环境。如果不指定,默认使用跟路径
  • log.dirs4 w0 q) q* [: t, E$ ^& t7 O
Kafka 把消息都保存在磁盘上,存放这些日志片段的目录都是通过 log.dirs 来指定的。它是一组用逗号分隔的本地文件系统路径。如果指定了多个路径,那么 broker 会根据 “最少使用” 原则,把同一分区的日志片段保存到同一路径下。要注意,broker 会向拥有最少数目分区的路径新增分区,而不是向拥有最小磁盘空间的路径新增分区。
  • num.recovery.threads.per.data.dir6 J: B' O; a" I8 f
对于如下 3 种情况,Kafka 会使用可配置的线程池来处理日志片段
服务器正常启动,用于打开每个分区的日志片段;
服务器崩溃后启动,用于检查和截断每个分区的日志片段;
服务器正常关闭,用于关闭日志片段
默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到井行操作的目的。特别是对于包含大量分区的服务器来说,一旦发生崩愤,在进行恢复时使用井行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说,如果 num.recovery.threads.per.data.dir 被设为 8,并且 log.dir 指定了 3 个路径,那么总共需要 24 个线程。
  • auto.create.topics.enable
    : a& C+ \* T2 I8 U# x0 `+ I( j
默认情况下,Kafka 会在如下 3 种情况下创建主题
当一个生产者开始往主题写入消息时
当一个消费者开始从主题读取消息时
当任意一个客户向主题发送元数据请求时
  • delete.topic.enable
    : e8 D" l  ^# w. p' Z% l
如果你想要删除一个主题,你可以使用主题管理工具。默认情况下,是不允许删除主题的,delete.topic.enable 的默认值是 false 因此你不能随意删除主题。这是对生产环境的合理性保护,但是在开发环境和测试环境,是可以允许你删除主题的,所以,如果你想要删除主题,需要把 delete.topic.enable 设为 true。
主题默认配置
Kafka 为新创建的主题提供了很多默认配置参数,下面就来一起认识一下这些参数
  • num.partitions
    6 x7 p6 v; l" a/ @2 G
num.partitions 参数指定了新创建的主题需要包含多少个分区。如果启用了主题自动创建功能(该功能是默认启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意,我们可以增加主题分区的个数,但不能减少分区的个数。
  • default.replication.factor; b3 K2 h' M$ x/ D
这个参数比较简单,它表示 kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务default.replication.factor 的默认值为1,这个参数在你启用了主题自动创建功能后有效。
  • log.retention.ms1 D! l5 i' {# p
Kafka 通常根据时间来决定数据可以保留多久。默认使用 log.retention.hours 参数来配置时间,默认是 168 个小时,也就是一周。除此之外,还有两个参数 log.retention.minutes 和 log.retentiion.ms 。这三个参数作用是一样的,都是决定消息多久以后被删除,推荐使用 log.retention.ms。
  • log.retention.bytes0 i; E5 ?6 r, V1 f7 q  @
另一种保留消息的方式是判断消息是否过期。它的值通过参数 log.retention.bytes 来指定,作用在每一个分区上。也就是说,如果有一个包含 8 个分区的主题,并且 log.retention.bytes 被设置为 1GB,那么这个主题最多可以保留 8GB 数据。所以,当主题的分区个数增加时,整个主题可以保留的数据也随之增加。
  • log.segment.bytes0 J9 l% ]- E0 w1 m* p
上述的日志都是作用在日志片段上,而不是作用在单个消息上。当消息到达 broker 时,它们被追加到分区的当前日志片段上,当日志片段大小到达 log.segment.bytes 指定上限(默认为 1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就越会频繁的关闭和分配新文件,从而降低磁盘写入的整体效率。
  • log.segment.ms: |  n, q# p* c, _% i5 j
上面提到日志片段经关闭后需等待过期,那么 log.segment.ms 这个参数就是指定日志多长时间被关闭的参数和,log.segment.ms 和 log.retention.bytes 也不存在互斥问题。日志片段会在大小或时间到达上限时被关闭,就看哪个条件先得到满足。
  • message.max.bytes8 `9 q6 h/ A" c8 V7 w, ?0 V6 t8 _
broker 通过设置 message.max.bytes 参数来限制单个消息的大小,默认是 1000 000, 也就是 1MB,如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到 broker 返回的错误消息。跟其他与字节相关的配置参数一样,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于 mesage.max.bytes,那么消息的实际大小可以大于这个值
这个值对性能有显著的影响。值越大,那么负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求。它还会增加磁盘写入块的大小,从而影响 IO 吞吐量。

' v/ N( }+ Y2 j, y; j8 N- f1 C% R

1

主题

0

回帖

12

积分

管理员

积分
12
QQ
 楼主| 发表于 2022-12-30 09:09:23 | 显示全部楼层
1、软件下载1.1 kakfa 下载
地址:http://kafka.apache.org/downloads, w- }! t; W0 E1 G9 F# L( Z

- o' p1 G  W& u+ M7 g% ~
6 V: C% |1 u4 v8 @- G  |! Y

. ^* g. A. w( N, k  @: E% Z1.2 zookeeper 下载
(1)因为 kafka 要依赖于 zookeeper 做调度,kafka 中实际自带的有 kafka,但是一般建议使用独立的 zookeeper,方便后续升级及公用。

7 U3 k$ z' f2 d1 [
(2)下载地址:
* k0 z$ h9 B2 e7 G/ M% `' u
2 {% P. _* \. ^$ N- A# f& n3 X/ x' T
0 w" Y7 t  X% E# L. v/ M( j

8 a- |; ?7 E5 _( z9 a3 B/ n# k1.3 下载说明
文件都不大,zk 是 9m 多,kafka 是 50 多兆

- E# c5 ?) ?4 \6 N8 H% Z% z  ~

4 X2 N7 G3 I4 o# `' ]! x

% \4 c$ _+ S' S$ [( I4 ~6 {2、 kafka 单机部署及集群部署
**说明:**北游在本地弄了三台虚拟机,ip 分别为:
4 q' a6 t6 u" N1 @  O6 J
& Z. S" Z( w8 J" ^
$ X8 I5 }9 _) l. ~/ p  Q! j" S. U
192.168.85.158192.168.85.168192.168.85.178; r* E" i$ P8 B7 e+ V7 @; x

; K. W5 \* [# B+ z! L$ o8 Y9 Y( t' p+ }2 G3 U$ R+ V
, C4 D6 m- T" b0 N
1 i0 y  s- b8 r' g* y& k( T: A
5 _' w6 ~. S5 m) e. a" h
3 C) K- h2 Q7 P- }, B5 N
) F/ i+ [, b; ?& k' O& G+ Y! c
4 U' E2 F% S' ^% H% z) l" R

. s" W" @& u/ ~8 f+ h
1 c! A' x, T8 B( _复制代码& l# w: g/ K4 w( H. `

  V- e/ ]0 I/ ^$ {% K( C5 k+ N
: B; c8 ]2 F6 W( A, @) z8 S% t/ ^* j, Q

2 W% W4 J& X' A" n4 A6 u5 \  C8 @2 i2 q
: Y% _* `# P& Q9 r; {' @5 v& f9 t

- d8 t# A7 @* @/ w* T& }5 C& d. ~2.1 单机部署
(1)上传 jar 包,就不再新建用户了,直接在 root 账户下执行,将 kafka 和 zookeeper 的 tar 包上传到/root/tools 目录下。

  M- i0 j0 v2 E; ~; c
(2)解压

) \# @* F# \, I$ d$ \5 w) `/ @; E( {( S' I/ E) h

* _5 q* }; W: w- I% z3 L[root@ruanjianlaowang158 tools]# tar -zxvf kafka_2.12-2.4.1.tgz [root@ruanjianlaowang158 tools]# tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz  
' G6 L# _/ G6 f/ t# v" V
# G) O9 O# y2 h+ r0 B, ^1 i7 R5 O5 P& r" q! A

8 F8 ^& @; s0 M- a  \
% L. O- G' V) o8 c8 a% U, C0 X+ g8 ?  @/ y; s
# M' X& p; ?: _- E- D5 T! t* `0 I
. z, N) o8 q2 ^9 q: l& {
3 n) S5 g& c/ r. l: e

$ N6 o! j, m% U1 u( Q6 a* G/ o
- ?" k& x; u1 A  S8 V! F* k3 Y+ \# @复制代码: _, }! J6 a$ N3 `& @. B; k8 A

2 `# ^7 r* i; `" [
( Y, A9 d8 n( D, ^% h' `! }1 @! h2 D. {" w- Z

; e! g9 S' c7 Z2 S7 C( Q* {  f5 m
( f( D. [5 C- l) c. M0 X
6 D+ z' D" C" _$ a' c( L

5 U2 `/ X  W5 B
(3)配置 zookeeper 及启动

" \8 z5 r1 G8 A1 S( }- @" F
( J4 ^6 l3 Q) o6 b
. f/ ?4 Q7 F* U# \& r
[root@ruanjianlaowang158 apache-zookeeper-3.5.7-bin]# cd /root/tools/apache-zookeeper-3.5.7-bin#北游,首先创建个空文件夹,在接下来的配置文件中配置[root@ruanjianlaowang158 apache-zookeeper-3.5.7-bin]# mkdir data[root@ruanjianlaowang158 conf]# cd /root/tools/apache-zookeeper-3.5.7-bin/conf[root@ruanjianlaowang158 conf]# cp zoo_sample.cfg  zoo.cfg [root@ruanjianlaowang158 conf]# vi  zoo.cfg #单机只改一个值,保存退出。#dataDir=/tmp/zookeeperdataDir=/root/tools/apache-zookeeper-3.5.7-bin/data4 r$ R; S* k+ B0 }- v# W* ~4 Y. M
#启动zookeeper[root@ruanjianlaowang158 bin]# cd /root/tools/apache-zookeeper-3.5.7-bin/bin[root@ruanjianlaowang158 bin]# ./zkServer.sh  start: L, N5 W4 m) c5 X1 I1 c* K$ S
; R2 [9 f' Y' O0 N- @0 L! D  }8 M

  w; s+ [8 z; W- n6 v, ^
1 T2 H/ P$ {, ^0 m* g* d% z! b3 u  J5 Z

; `+ Z; V5 ?6 w) d  y+ y+ a3 g: R( d5 W& L

* t* B9 ~- A( z% A
2 y0 L% S1 s! u& v. _; v( T' ?
1 F8 \% n7 q. ~/ o4 x) T  g$ S! x. e: R
复制代码
( L' [% h6 e4 q  ?. Q( V! H/ u' q, ~( z3 w! M
7 [. f0 X" |/ p% z+ X+ E5 g2 ~
5 N  `. H' s& D2 V2 ]  X4 F

$ f( c# V. q+ E% ~" }, N) S8 S+ d, ^5 s1 q: ]

3 u; f6 V! X7 y6 r3 l: L/ Z: @, |  Z  j+ R( j) b

0 C/ j) R+ J- n: a9 l5 `/ B; ]: b' [; B: I9 |$ {
7 A( w# u5 f/ g8 z4 B" V. N

* q& T4 G5 m! s4 \" V" J& G. ~) i! ]1 X, Q" H

6 @9 p7 _; I2 |1 K2 y  q/ C) p
0 {' U  S9 y! S6 @3 f9 S0 T
( ^* B0 p* Z# I! c' z7 r; ~# _: u$ e7 e/ i. u0 O
% L1 X* i- L4 c* v) v. K0 {

7 r4 _3 u4 F/ T) l
(4)配置 kafka 及启动
- H/ ]1 a* f1 W  [! N4 ^) _

( S5 \4 S$ ?8 s6 ^0 w
) J  b3 C. C2 w2 w/ u7 o! w
[root@ruanjianlaowang158 kafka_2.12-2.4.1]# cd /root/tools/kafka_2.12-2.4.1. ]# y# G. i& e/ l3 P+ {2 N1 y
#北游,新建个空文件夹[root@ruanjianlaowang158 kafka_2.12-2.4.1]# mkdir data8 p1 q9 D2 v5 V, _( ~
#北游,更改配置文件[root@ruanjianlaowang158 config]# cd /root/tools/kafka_2.12-2.4.1/config[root@ruanjianlaowang158 config]# vi server.properties
: W( Z1 ?& T+ E7 t#需要改3个值#log.dirs=/tmp/kafka-logslog.dirs=/root/tools/kafka_2.12-2.4.1/data#listeners = PLAINTEXT://your.host.name:9092listeners=PLAINTEXT://192.168.85.158:9092#zookeeper.connect=localhost:2181zookeeper.connect=192.168.85.158:2181% `/ A! K( i. M- C3 S  v2 n
#启动kafka[root@ruanjianlaowang158 bin]# cd /root/tools/kafka_2.12-2.4.1/bin[root@ruanjianlaowang158 bin]# ./zookeeper-server-start.sh ../config/server.properties &" e" X: C4 Z& R. y8 w
9 S8 a3 {4 t! S/ @% O5 ^+ W) k4 U

; W0 e, e* [$ R0 L6 }; M2 s- ]% R: w: \/ B9 _' r- T

0 E$ [- {- {' k* n( |8 `$ D! \  n3 ~, h5 H( m

; r- ]" s1 o& ?" Z' y+ N! I% Z+ W5 y- |4 p8 k6 I7 W

: W- p& s+ W2 ]5 w$ a) E/ O; P3 C1 m
% `/ _4 `+ U0 ?
复制代码
: X7 p. Y- T& F  f
2 V$ g6 a. w  Q& O+ K. U7 A2 V) ^; C: O* R+ S+ {" e
' h8 P. L+ a' J/ V7 _8 X
. a7 |3 h. X4 K6 f6 C
) r% L; T/ I/ ~  ^
% d  x* K, e+ l" x& u: Y
5 l! ~# _0 @, l

& ^/ M) ~/ K' Z9 M' E
2 n! b- C9 h# C$ c
) R7 H7 ]& Z8 C# r( g
' j% a* H' [  |0 j  Z& C& }
( f( r' n' d6 e3 ]' [' T+ O0 q; ]$ C( d2 x3 V( o
! `- K. K! a! S- P' ~3 {( I
1 X1 A  L) [/ Q+ p7 _
4 F; e+ F1 \: m( i6 F. `" q1 @# q1 ~

% N' B$ o/ W, h& ^% }) @
( e  t0 X! E0 {* T  H0 {. o6 Z# a/ m( {  |3 h$ X5 v
4 K; x) u2 V4 B9 I8 h7 L

$ ~  A# w8 ?0 }0 n* A' X; o  [; J) v
1 x+ w; i9 _! J; W2 ~
) F* d3 e/ I8 r
; U; o9 s. a8 X$ v; `, A1 F
启动完毕,单机验证就不验证了,直接在集群中进行验证。
2.2 集群部署
(1)集群方式,首先把上面的单机模式,再在 192.168.85.168 和 192.168.85.178 服务器上先解压配置一遍。

8 }) f4 N+ h4 H3 q
(2)zookeeper 是还是更改 zoo.cfg

! u: Z# N1 ~. P# j% y
158,168,178 三台服务器一样:

7 Q, y  u0 y0 m1 B$ C) u! Q
  L1 m' {* O& Q' n5 f

" F5 e4 C/ e0 b7 ]1 G/ n- M- h[root@ruanjianlaowang158 conf]# cd /root/tools/apache-zookeeper-3.5.7-bin/conf
- N/ [6 j; }3 W* w1 b[root@ruanjianlaowang158 conf]# vi zoo.cfg#其他不变,最后面新加,三行,三台服务器配置一样,北游server.1=192.168.85.158:2888:3888server.2=192.168.85.168:2888:3888server.3=192.168.85.178:2888:3888
3 T0 l- D& j5 x158服务器执行:echo "1" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid168服务器执行:echo "2" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid178服务器执行:echo "3" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid  u! [# Q) Z) I! O6 w: o+ Y  t8 _
9 o- m/ @$ C9 g; \. ^% D3 M

$ }1 F" V6 N( \: K2 _0 {& o  J/ R! ?' l8 k- m
/ h1 H3 W4 k! X$ D6 K8 X
7 _: u0 u+ S+ G2 y2 L0 g
3 F7 t% q" ?9 a% _3 y2 ^+ j" k& I
* p0 f% l' M- {. V" q' I& o

- U# c! |4 y) ?8 \% O$ [
: ^  E8 F0 ^0 V8 {1 p$ R  I( F7 N1 a4 I$ N; Z( c
复制代码
4 r4 p; p1 s$ C5 m" ]
) \3 ~" X/ m2 B8 y" D  k( M, g0 r9 M. R

; B- p. P& a' ?6 a2 ^8 Q2 t; }- \+ Y* r3 J* y1 R( |% `3 H
0 @2 j3 u9 n1 y
! N1 i* w3 ]; h. V/ ]6 F# U/ d/ w

( S( P% ~, b4 O$ M! `
4 {1 I+ _: P1 e) B/ u1 q
* R% t/ O, P' a. d: j4 K. k& _, q2 H$ O2 W
, V! q/ S+ y% I3 ^
3 V2 R; R3 t& N: I
! f: F+ d' e; ?+ m" \" F6 ^) j# \0 m
( h8 k9 G9 p" z# b6 h/ S& g

- _5 K- D4 _. r/ k6 M& p+ G8 A; l( U& S2 ~3 V
( j8 s' E- @6 U- a: n

& @! g+ m9 F7 T, t( ]6 v# L2 r' z+ k4 c4 B
(3)kafka 集群配置
, N. t" y5 c; _1 a
& j4 Q# u7 ]% u  q1 M* L

0 }5 w1 ^5 K7 {" ?9 E[root@ruanjianlaowang158 config]# cd /root/tools/kafka_2.12-2.4.1/config[root@ruanjianlaowang158 config]# vi server.properties #broker.id 三台服务器不一样,158服务器设置为1,168服务器设置为2,178服务器设置为3 2 H0 [5 `* o+ S7 k
broker.id=1#三个服务器配置一样zookeeper.connect=192.168.85.158:2181,192.168.85.168:2181,192.168.85.178:2181* T) j4 y5 U, W

/ W5 D! H5 v  H4 d% F) y, X& _
, ^4 `, ]! k& f3 j0 i3 M0 i. U( H
/ F( q6 F) `$ I, G+ W4 v1 F5 I2 q  z2 A- s3 m
6 q: h9 P1 R% v- ?' r6 j
# d/ y* ^& K8 L4 q' c: V3 X
; n+ L/ M; V1 \0 d) {, L
4 S% a( V; g& P. T/ ~; b# f( x9 U

! l0 x) k$ k* M4 d0 l5 [0 o$ N
, w+ G5 G# k0 Q复制代码/ H2 r; l7 A' ]& d5 G/ l( h

6 x! [! A4 m6 a7 @* w8 B% P# b& M* M
8 }7 z% h0 G' u6 _- q( E- M+ O4 y! }- s8 c# V% n- |* k
& _, q( k" w6 F2 z; P: ]) Q
0 d8 M0 w+ [2 q! z, V" R

. ?' f2 _" u: j
$ [# q* |+ V( C+ @  Y
7 `: H% M6 b5 l. F, K+ y1 V( P( |. D. ~" Z4 g4 N
0 G& B+ X7 X: K; |% K% n) i

. S4 P6 C" t/ V! [* h% H! Z
7 G" \& Q6 J0 z! i- P3 _; t
Kafka 常用 Broker 配置说明:
( T2 {* M' p  E% T! r8 g  W# m3 \: I

+ C( c7 b7 t3 @0 E; N, G
* L( ?: \; o' _0 ^. g6 x" n8 t+ ^* E

  M( ^+ o  u+ n6 E' P! H9 F. H
192.168.85.168:2181,192.168.85.178:2181 | ZooKeeper 服务器地址,多台用","间隔 |

; k: A3 A6 g) f5 @5 f3 J) V
(4)集群启动

6 }' o5 u& x0 D% S! c2 ^( ]! n
启动方式跟单机一样:

' H, D' s( J. e- ~) F
8 J% o9 W1 H* Z$ H# Z0 M; n

- h  y: w9 ]3 P$ s; J#启动zookeeper[root@ruanjianlaowang158 bin]# cd /root/tools/apache-zookeeper-3.5.7-bin/bin[root@ruanjianlaowang158 bin]# ./zkServer.sh  start
. I- W: w# k3 z- H' N1 n#启动kafka[root@ruanjianlaowang158 bin]# cd /root/tools/kafka_2.12-2.4.1/bin[root@ruanjianlaowang158 bin]# ./zookeeper-server-start.sh ../config/server.properties &
' n# f' j6 d  u+ v/ [6 z- R" y! i* W( _

& `3 W1 P0 U: a3 t) s- P) }* W( y+ L8 ?, Z# ?
! `; g0 z: m+ K) W9 p; l

$ g- x' Q2 d0 A2 ?. G$ l) D* {( k6 h9 u" }- P
, K3 a9 Z# ]& B' e; @" _

; |+ B; O  v2 P  M9 h5 N( L
! T8 g6 U+ N( q- }' d8 q; E1 W$ d
复制代码. H3 ]! H! B- S1 T
, S0 W. |/ x: Y( E' c. Q

; ~9 [8 F. \! O% [7 `% ^
- f" U- O, |2 k. X
3 i$ G% f: A  V" C% L* {, h) @9 ^. o  e3 X! M  U! V7 ?; n
6 t# }: B' p- x8 k  y
$ t* s& ^  W" S/ C* @5 h  ~3 }- `
1 m( l$ Z8 @' k6 U
9 b2 `0 S! o( V+ i$ a

$ n& {4 {" W: T+ c2 K3 _
9 Y. Q0 A* i. }2 P2 ~( D2 J! c
, c" z- o8 ~4 q/ X5 u+ O
(5)注意点

) l, d& r& \5 I4 ]8 E# `) Y) N0 H9 b5 u' p
) F* E! z2 m5 d6 v  u! D( g
集群启动的时候,单机那台服务器(158)可能会报:Kafka:Configured broker.id 2 doesn't match stored broker.id 0 in meta.properties.方案:在158服务器data中有个文件:meta.properties,文件中的broker.id也需要修改成与server.properties中的broker.id一样,所以造成了这个问题。, Q7 T! p$ W  K$ Y: d3 y$ u
6 D8 b; o7 D, o1 r

% {' i5 n- A! u5 q3 c
5 z9 Q  R9 ]( h7 p# A" W
( g- a1 _. z0 ?
; C- k$ J- b/ \7 [* O6 ~8 W/ r5 B$ D9 |7 }+ G- u% B7 s. A; m& s
: u8 X& [  J( i  _
, C, o/ A. u1 @8 t: G
# E  G+ F* l; T( ~8 d% w5 X

- F/ @- ^( ]* W$ w" S
6 K4 E' P  y) }  ~+ C) y复制代码
; x/ N9 Z5 C  k; O/ ?' ^: c8 L. u: F# Y" J% b6 d; \( ]1 y

9 M- u4 r9 X+ W# i( Y4 U& t& |- O1 K' K6 f- ^( A; @* _

' V1 R( v0 j  J9 S* y1 O( X* N
! D% b  E  j# c" f7 H7 V
) b) }# _5 s' W8 M) S

& H0 L" l& l, U" I
(6)创建个 topic,后面 springboot 项目测试使用。
! f, v4 A" k/ x& o+ V8 F$ n

2 i; }* X- |/ y! M& L

- \% T' X6 k- Z+ m: q9 |! }[root@ruanjianlaowang158 bin]# cd /root/tools/kafka_2.12-2.4.1/bin[root@ruanjianlaowang158 bin]# ./kafka-topics.sh --create --zookeeper 192.168.85.158:2181,192.168.85.168:2181,192.168.85.178:2181 --replication-factor 3 --partitions 5 --topic aaaa
. u% s; i8 F9 G4 K3 R8 E# N' b4 q1 B0 d0 m- y! x! E
& A, I% c' t0 F  \* x
6 F) ?; D, u7 O% s6 l9 |) g/ K
" Q. d. N+ v9 P% s# p% b. T+ P
$ u) H+ f. M" P: G5 f, _

7 \1 _; z% n: {$ Z5 r* K3 m- u
2 q& o9 F; W9 ]4 X' y! i% M8 A0 c: Y. W  ], u! R$ e
( ~, N2 F. _2 \/ d

( Y1 A( u8 Y8 C+ p7 j: t# K
9 I+ }) n  q% ]( q& e0 y9 Z2 ~复制代码" O8 j) O+ k2 ]0 c+ |1 a
# d' t% ^9 ?, f( B4 V
% \3 z5 O' P5 Z; j! q
9 v$ I2 _6 z. l9 s
0 a/ T( r2 r8 i' C$ U0 |4 M
% \# d* T4 J5 Y+ S+ ]
9 p! D* y/ S( @* Q
3、结合 springboot 项目3.1 pom 文件
8 l. j* g1 w1 {( O9 I0 |* \
0 a0 B6 A7 e7 P( n* y3 w
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.2.0.RELEASE</version>        <relativePath/> <!-- lookup parent from repository -->    </parent>    <groupId>com.itany</groupId>    <artifactId>kafka</artifactId>    <version>0.0.1-SNAPSHOT</version>    <name>kafka</name>    <description>Demo project for Spring Boot</description>1 _; u0 q. B0 A( Q
    <properties>        <java.version>1.8</java.version>    </properties>    <dependencies>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.kafka</groupId>            <artifactId>spring-kafka</artifactId>        </dependency>    </dependencies>    <build>        <plugins>            <plugin>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-maven-plugin</artifactId>            </plugin>        </plugins>    </build>
) x/ `5 F" Q7 l. b1 p7 m</project>
; K8 \8 k5 `+ l& h  _  w3 {
, e* u$ R' Z) S" Q( Y5 ~* A4 C: I3 S$ ?: m* m9 S0 F7 V, A
  N5 m/ O" D/ D, b' D
' u% i8 ^3 F8 F) B* }

, I/ t$ Y3 B9 V$ C' q. l& ^  y  b5 m( H: A* ?1 x8 R% O

1 w8 N8 b$ w: g  q2 M: A8 h2 n+ t, D' l+ M4 d7 n5 `5 }' p

4 F1 {8 Q7 Z! V% n! d+ X. M* ^" W3 V4 k4 j2 W) S9 W& _
$ y9 a: O, x2 }" {! `/ A
复制代码) o) Z  d9 [* S  |

' _4 V* N1 y: ^( \& v6 m% B5 Z9 O; `8 _: L7 V1 X$ ~% X: I

9 p: N1 @& n' T! Q0 \3 u. Z
( N  ?3 Z  q( X% C8 D3 ^( H% E% {7 J0 m+ M3 A2 S9 p2 u
8 _$ n5 L- P1 c0 Z& z

& ~$ q- N6 p7 v0 |( a
  B) y6 D& P; {4 E. d1 i) v8 E, t4 Y0 K0 l# E

3 k; h0 u/ W; Y! }9 W8 z6 w, C, a9 `  Z* ~' u. j
" u5 g+ d* O8 C- `0 Y" ^2 ^' ~2 r

( L1 P, H4 S  C9 M" m
3 ~4 q8 c3 I9 t
. c. g0 f# K( B4 h* k4 J" p
8 ~9 ]4 K0 z% q4 ?( }3 V+ S# e4 Q4 S+ g: z9 G
# B1 C2 T! Z5 c

' W0 G; E# h0 G& ]/ a3 e6 W. J" n
/ e1 ?* x# q! M! [+ K1 N- {$ \; C1 |8 Y8 X2 q
3 n* d4 g+ a3 q8 u- M& f7 A5 u
/ o9 M1 Z9 k  @1 \4 B8 }! B+ F

2 c; S, n, v/ O0 ^
& h" X" @( f8 q2 q, Y3 K0 o: N
: s/ o3 d9 @, x2 b; s4 N: s+ m  e$ k6 g: F5 ~
+ W' O( i. b1 h; G& \$ u" s! ?4 w0 n

; w: b5 q) {: n& [" a. C7 ?
/ V0 _( g. Y# d* W# K& Z  O
" _) ?. F8 }0 G, B. t2 o$ e- G# U+ K2 v( P  I5 }% c
7 a, O* ?% Z7 V. K9 c' p

2 A" O" V/ t3 C% C8 V0 d
( d8 ]& A: c2 Q) j" Y8 r
0 c* \3 v; B7 B: ?% H$ m/ I3 c
+ `# p4 g5 v# P# K6 I7 ]3 g  ?$ c  u2 P: U" M) c: l7 B

9 p2 y# t* q: r# s3 e- v1 a9 h5 v) ?& C: p+ d% g' s. Q+ B
! b  N+ q- t& {9 k% o
7 w7 o& ?# r. v7 F" V

8 H; m$ `. U- g! D! J3 @; v5 m
2 }0 H+ T; w5 w; `+ t4 z' c
说明:

  L4 D& ~5 t& r# I. h" I0 O
主要就两个 gav,一个是 spring-boot-starter-web,启动 web 服务使用;一个是 spring-kafka,这个是 springboot 集成额 kafka 核心包。
3.2 application.yml
, t. `  d8 I  f7 m* }
' ^+ Q0 F, y8 P$ e
spring:  kafka:    # 北游,kafka集群服务器地址    bootstrap-servers: 192.168.85.158:9092,192.168.85.168:9092,192.168.85.178:9092    producer:      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    consumer:      group-id: test      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer4 N+ d# j8 R/ j. T3 Y' |
5 I$ ?) ]3 E3 ]. [; u2 [$ ?& c

/ j7 O6 Y( |/ w+ R) }9 |0 ^3 q8 C3 m

7 u$ }9 O  ~1 R8 d: C0 d/ b8 w9 o' q6 P6 K1 k
( x. r7 P% |; G2 _2 |  p% [) r$ {6 r

; p: Q" V8 j8 a- `2 r! ?, t, f( B$ F- _
3 ^$ t4 A6 c. v9 H; P! ^+ K
7 ?/ \7 N4 _( [& R) H$ U0 I3 D
复制代码
2 |+ }9 {- U3 V- _! o2 s4 b. \! X0 |
2 N4 E. q/ Z0 s; x7 E6 K; O5 l, Z( D& p% Z# L, u
) I3 o2 h* s' a( c- S- L

* M/ t( J+ t1 T; U
# q/ w8 E$ G% D9 N/ S% {5 @7 I2 g# [6 p; N" e) s0 E

0 p/ v+ U& ^# y7 b8 Z; O. S
) B3 u, P5 C0 r0 p% n4 ?
( Y; [2 ~, Q$ {8 s
/ B0 _4 V' X# R5 x& B
/ s1 f& o$ ^+ M% W( [( q
2 `* b$ w- j7 L! Q: p( r& m8 o
4 I1 F8 f( f6 o- n( M; {: t, [3 t. p1 h8 [! R; \: t; a' D
1 @, H+ }" M+ X( E- V/ Z# W, d! c
3.3 producer(消息生产者)
2 y" Z9 |& b4 H, I- w

# Z) f# j* H0 `@RestControllerpublic class KafkaProducer {    @Autowired    private KafkaTemplate template;     //北游,topic使用上测试创建的aaaa    @RequestMapping("/sendMsg")    public String sendMsg(String topic, String message){        template.send(topic,message);        return "success";    }}# t5 q, G: ^, _1 I1 C

# V2 e. y* w3 X# Z8 n* Y: n& ^( L( G

  \/ O3 L" g* z. q. m( g5 U. e" ]+ x& l- C
" @: {' r1 q; W: A

* T3 ]2 `9 J8 ~- K/ [+ ?
) k* T0 j1 d: e9 {+ e3 E5 N7 {+ ]+ N& B+ ?- C+ N  r
8 z  }3 v8 M0 ^. s) E
, _3 J! i  }7 Q$ E. s3 G0 Q/ Y
复制代码- R; v% b+ `2 g& X6 s) A/ c+ K

& M0 \. R* t% |& H0 x% l. m& w% q
' |! }# K0 ~9 u+ y% V+ ?
5 r& h6 J3 V* _, U& t( F  `" P" ?5 I- g# |2 y1 G
0 A6 a# ?0 v2 E/ Z

' b* x  j5 W, v% y4 R
- e# y5 t! n9 {
2 W1 A  e: N1 P6 X* u& o! f
& k/ P, Z5 [" Y, x3 W) c
- I. Y: r3 J' B$ }( P8 f! y* u/ _2 X9 p& v8 U% d2 O
7 U/ W/ G- I6 X7 E& G

4 a! ~  I/ `: z+ O- ^" [& Z* b! \1 y6 |1 t

8 d3 p1 r# S, e3.4 consumer(消费者)/ C: W& S, c9 J/ V, y

& ~# z- l- o" B' o  _" }2 K, G+ H/ N@Componentpublic class KafkaConsumer {   //北游,这里是监控aaaa这个topic,直接打印到idea中,北游    @KafkaListener(topics = {"aaaa"})    public void listen(ConsumerRecord record){        System.out.println(record.topic()+":"+record.value());    }}3 o% D  x& t; {: }

  z, }) f: k+ O  Q5 x2 M) r
( A( M- N- A+ Y1 f# j% W, ~' [" j% ^" }7 L; U0 e' ^

9 w" A$ x5 a/ p# v. t' l' s# v. d$ h2 y; Z& ?; E8 R

% i4 g- L1 n* C
6 T; E4 Z& P/ R/ ^4 D1 G4 g
9 G0 x9 a5 C, H0 q3 u* K+ {7 G
* t3 t( l8 D; m0 f1 B! R$ }1 U/ L. K: ^/ Q
复制代码
2 F! p; a) I0 n! j1 q; m* O' F2 r, _) z3 r' _# d' p6 H

9 e! y: E' B0 Q- b! t+ r! E$ V, O5 G  }1 V; A8 H* g

8 E7 a2 l6 H( W7 O3 j; \# n& v
9 ]0 j* o; D" x9 L1 ~! s
5 v$ O; o- [, j5 u
( f- b% W  o: v  O, b8 X' @: }4 ]& \+ n- g" @. x$ I
+ @. j* `: U; d$ H. a# s

5 y) n, F9 `9 R7 N% H! |$ G( C  ~8 ?. p( j; ]+ |
: L2 t  c( s! F5 F( \
3.5 验证结果
(1)浏览器上输入
. Z( J% Y( w# ?9 ^) H( s) M* [
$ Z( B3 c3 c; M: D. e  g
. L; v8 L" A  H$ B' b4 v9 o
http://localhost:8080/sendMsg?topic=aaaa&message=bbbb
# Z% Q2 d4 d% C9 k% }( O) E# }9 M0 l
  H) V6 p* I+ o1 _

3 ]: U0 R. I. n' i' k) ~0 y) o7 r0 }% [0 Q1 ?& [; d

5 Z- ^7 ]: I+ U( N
  m2 Z  e) F- U
; s5 h) f+ j  o& J' `+ K, Y" f6 g
8 G2 e# V* c- ?7 W- @" D6 f* [3 ^1 U5 A
( u! J1 K4 W! q% W& U
复制代码
1 p+ T3 `) X; A- @/ p1 S3 N% h; \$ B/ M
4 k& W% {" {+ o: P( G: k

7 E4 T6 Q9 j$ `. J- s3 j, b) }9 V) D4 W! n& n. @) y

* D% f  Q. x- N5 B% d* C# R' h! Q5 t- V- F* s/ H
(2)北游的 idea 控制台打印信息

. V( F" H) [0 [9 b7 H5 Q: r! G
0 J& n' j: \1 J* k
9 y' h. f  m8 H
& @' [5 x/ ?7 ]( R. t
二、Kafka 副本机制1、什么是副本机制:
通常是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝
2、副本机制的好处:2.1 提供数据冗余
系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性
2.2 提供高伸缩性
支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高读操作吞吐量
2.3 改善数据局部性
允许将数据放入与用户地理位置相近的地方,从而降低系统延时。
3、kafka 的副本
(1)、 本质就是一个只能追加写消息的日志文件

( k: B5 p2 S  V
(2)、同一个分区下的所有副本保存有相同的消息序列
( R/ E. q/ f5 J+ D  S7 g$ ]
(3)、副本分散保存在不同的 Broker 上,从而能够对抗部分 Broker 宕机带来的数据不可用(Kafka 是有若干主题概,每个主题可进一步划分成若干个分区。每个分区配置有若干个副本)

, g$ |4 f* W" V2 p& j$ D- [# i
如下:有 3 台 Broker 的 Kafka 集群上的副本分布情况
! V) |: q1 E- s* t+ d; e

3 n9 E: R0 S  p8 b

9 r0 G( w2 [+ y1 d6 r/ H4、kafka 如何保证同一个分区下的所有副本保存有相同的消息序列:
基于领导者(Leader-based)的副本机制

$ j" ?/ [  w' B* H% z; Q+ `7 C
工作原理如图:
+ K+ A4 y' c' v& g" g! l
3 c7 z; _# L% `8 N5 B% d
8 V6 T/ A# Y# j# B

/ u1 R# g) R/ F- _
(1)、Kafka 中分成两类副本:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。

) ]- b; K; ]& h2 L" z
(2)、Kafka 中,追随者副本是不对外提供服务的。追随者副本不处理客户端请求,它唯一的任务就是从领导者副本,所有的读写请求都必须发往领导者副本所在的 Broker,由该 Broker 负责处理。(因此目前 kafka 只能享受到副本机制带来的第 1 个好处,也就是提供数据冗余实现高可用性和高持久性)
# c4 h- g/ R! [4 E8 f+ \
(3)、领导者副本所在的 Broker 宕机时,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。
5、kafka 追随者副本到底在什么条件下才算与 Leader 同步
Kafka 引入了 In-sync Replicas,也就是所谓的 ISR 副本集合。ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的
6、kafka In-sync Replicas(ISR)
(1)、ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本

5 ~& x! L% v& D+ q/ l( P9 W: E
(2)、通过 Broker 端 replica.lag.time.max.ms 参数(Follower 副本能够落后 Leader 副本的最长时间间隔)值来控制哪个追随者副本与 Leader 同步?只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。

, ^, @. c" [6 m1 {7 F& R# ^
(3)、ISR 是一个动态调整的集合,而非静态不变的。
6 a3 K! v4 d" b) Y( Q2 `
某个追随者副本从领导者副本中拉取数据的过程持续慢于 Leader 副本的消息写入速度,那么在 replica.lag.time.max.ms 时间后,此 Follower 副本就会被认为是与 Leader 副本不同步的,因此不能再放入 ISR 中。此时,Kafka 会自动收缩 ISR 集合,将该副本“踢出”ISR。
2 {( t$ C( Y! h' L& u
倘若该副本后面慢慢地追上了 Leader 的进度,那么它是能够重新被加回 ISR 的。

' o$ a# c2 Z- Y1 }
(4)、ISR 集合为空则 leader 副本也挂了,这个分区就不可用了,producer 也无法向这个分区发送任何消息了。(反之 leader 副本挂了可以从 ISR 集合中选举 leader 副本)
7、kafka leader 副本所在 broker 挂了,leader 副本如何选举
(1)、ISR 不为空,从 ISR 中选举
' i* U: S) a5 a. d# _
(2)、ISR 为空,Kafka 也可以从不在 ISR 中的存活副本中选举,这个过程称为 Unclean 领导者选举,通过 Broker 端参数unclean.leader.election.enable控制是否允许 Unclean 领导者选举。

' _5 J" A* j) x, v# E+ O% O
开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。

0 W. d8 F! _8 T7 B* b7 c
一个分布式系统通常只能同时满足一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)中的两个。显然,在这个问题上,Kafka 赋予你选择 C 或 A 的权利。

3 T* X+ o2 `: g! z
强烈建议不要开启 unclean leader election,毕竟我们还可以通过其他的方式来提升高可用性。如果为了这点儿高可用性的改善,牺牲了数据一致性,那就非常不值当了。

$ O! }5 w9 K3 O( e  `
ps1:leader 副本的选举也可以理解为分区 leader 的选举
, C1 k  k+ n, L6 a! H# B
ps2:broker 的 leader 选举与分区 leader 的选举不同,
4 Q) A: e1 d: m+ K
Kafka 的 Leader 选举是通过在 zookeeper 上创建/controller 临时节点来实现 leader 选举,并在该节点中写入当前 broker 的信息

# c( ^. o  v7 M- C: P+ v9 R/ |9 K4 u' T! R, y  N
: r0 X/ W( [" P5 U2 \
{“version”:1,”brokerid”:1,”timestamp”:”1512018424988”} 6 a7 y0 q. ?% Y
: V* {. G1 r6 ^# B, G* w# t/ E8 \

/ c1 P- d* G! z8 C* {
/ b1 R& l! p7 Y% X: C/ L! l2 x! c) E; Y* b+ [; I
% a0 H' Y$ n- G/ R3 T2 V
* q6 T3 t( U, M( r9 c& }9 X% f) w
& D" _2 |* d) c* c4 [
2 N# F" T. a3 t* }; Z6 b
: t7 i* B  _) A5 k, F6 S9 b' q
. k5 m( y& i! v1 J$ e& ]
复制代码
: O& @  g: y( {7 v/ ]. F2 A. c9 M9 A, A0 |) v4 G3 N

6 g0 |% i1 ]3 D2 d/ d' \
1 n/ S  F" [8 Y( \) I5 j' i$ f# ^. C: J: f2 c$ `0 w
3 Z4 i$ d8 U* _7 i" K. N

& c9 @# d- n" k/ {
利用 Zookeeper 的强一致性特性,一个节点只能被一个客户端创建成功,创建成功的 broker 即为 leader,即先到先得原则,leader 也就是集群中的 controller,负责集群中所有大小事务。
& |. p0 @, {+ O
当 leader 和 zookeeper 失去连接时,临时节点会删除,而其他 broker 会监听该节点的变化,当节点删除时,其他 broker 会收到事件通知,重新发起 leader 选举
  ]2 ~4 q0 O# t; ]0 Y8 A4 [3 o
再给你们留个小问题:如果允许 Follower 副本对外提供读服务,你觉得应该如何避免或缓解因 Follower 副本与 Leader 副本不同步而导致的数据不一致的情形?
三、实时日志统计流程1、项目流程
在整合这套方案的时候,项目组也是经过一番讨论,在讨论中,观点很多,有人认为直接使用 Storm 进行实时处理,去掉 Kafka 环节;也有认为直接使用 Kafka 的 API 去消费,去掉 Storm 的消费环节等等,但是最终组内还是一致决定使用这套方案,原因有如下几点:
" ~, n# _8 s, ~, I1 H' i
  • 业务模块化
  • 功能组件化
    + [0 {0 F8 F+ ~4 u
* w& ?) }. Y* t: t- A
我们认为,Kafka 在整个环节中充当的职责应该单一,这项目的整个环节她就是一个中间件,下面用一个图来说明这个原因,如下图所示:

! S. Y6 `, Y- }5 L8 T# ~8 [
. b% H5 @8 o9 U2 s, x' x

# }4 B8 o2 g9 T0 f$ X0 m3 x2 Z# @7 b
整个项目流程如上图所示,这样划分使得各个业务模块化,功能更加的清晰明了。
' j/ o* J* t$ F
  • Data Collection
    + O: _$ v0 k3 y/ L2 P3 Z. @
$ M4 X7 _% D  _
负责从各个节点上实时收集用户上报的日志数据,我们选用的是 Apache 的 Flume NG 来实现。
0 W& H& l* [! N* S2 a0 n4 I
  • Data Access
    6 W5 @4 g% e6 i8 y/ M  z1 @/ E6 ]4 u
7 F1 o9 p; }+ F$ j+ b# a
由于收集的数据的速度和数据处理的速度不一定是一致的,因此,这里添加了一个中间件来做处理,所使用的是 Apache 的 Kafka,关于 Kafka 集群部署。另外,有一部分数据是流向 HDFS 分布式文件系统了的,方便于为离线统计业务提供数据源。
  h0 B. b/ r+ h3 m
  • Stream Computing
    ! C/ u1 x  A$ v& y3 Q, A
2 b( w) x5 d, K9 I1 L- W
在收集到数据后,我们需要对这些数据做实时处理,所选用的是 Apache 的 Storm。关于 Storm 的集群搭建部署博客后面补上,较为简单。
+ @6 B' L; g( W' ]
  • Data Output

    4 Q0 g2 v2 V! k
4 s' _  \9 S$ f. ^& X, P) g
在使用 Storm 对数据做处理后,我们需要将处理后的结果做持久化,由于对响应速度要求较高,这里采用 Redis+MySQL 来做持久化。整个项目的流程架构图,如下图所示:
/ l3 A2 H# Q! H% ?1 X

- G5 ^% n1 t4 ~* E

3 M( x. @5 `. ~  H6 v- I+ c7 `) E2、Flume
Flume 是一个分布式的、高可用的海量日志收集、聚合和传输日志收集系统,支持在日志系统中定制各类数据发送方(如:Kafka,HDFS 等),便于收集数据。Flume 提供了丰富的日志源收集类型,有:Console、RPC、Text、Tail、Syslog、Exec 等数据源的收集,在我们的日志系统中目前我们所使用的是 spooldir 方式进行日志文件采集,配置内容信息如下所示:

/ U8 [! w7 w& y' `6 R% k* M& Y. Q- ?

8 o3 d9 I, g( Q  Tproducer.sources.s.type = spooldirproducer.sources.s.spoolDir = /home/hadoop/dir/logdfs
8 Y. ~5 J# S! Q4 S+ t, v& Q2 P4 Y2 V0 k5 V( M$ C/ m
8 s+ R5 |) s. x& d
' n2 s# D) j4 v7 ?, t) V4 M

" z$ S3 O7 r* n2 N4 B3 I# {2 k. ~5 z/ u$ T- m+ Q' ]8 L. \8 W

2 O0 n' G$ V) M1 H6 ^2 x6 x: n4 }. {! ], g7 \' K! f
+ r! T: h! k4 w
8 P% A+ v2 M9 ?/ J- Q$ B8 h

% [; Y5 i7 H9 R6 v. C! [) h复制代码( Q; b4 |  R6 u3 R, h

7 b0 Q1 h- j* H2 M! t
; y, Z- |' S6 W9 D6 K) A
  B6 M( B  g1 q1 ?% n
; g* }3 P. ^# C% `) X& ~: q; \$ M3 L6 o8 z8 R
; h1 x3 m& O# H
- S3 }- S# {3 y) y' m
当然,Flume 的数据发送方类型也是多种类型的,有:Console、Text、HDFS、RPC 等,这里我们系统所使用的是 Kafka 中间件来接收,配置内容如下所示:
8 r# j- x( S+ J0 Q% n
2 I8 F) D* j. B( k* J6 K! }( U

, B1 s* X2 B% d0 Pproducer.sinks.r.type = org.apache.flume.plugins.KafkaSinkproducer.sinks.r.metadata.broker.list=dn1:9092,dn2:9092,dn3:9092producer.sinks.r.partition.key=0producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartitionproducer.sinks.r.serializer.class=kafka.serializer.StringEncoderproducer.sinks.r.request.required.acks=0producer.sinks.r.max.message.size=1000000producer.sinks.r.producer.type=syncproducer.sinks.r.custom.encoding=UTF-8producer.sinks.r.custom.topic.name=test
; k* I& B3 w# O' A4 Z( d
" C/ Z' I* w+ n- M  x8 ~7 u- H! p; p1 K6 _' [$ o
$ Y+ E* L4 d+ r, x' b2 J3 W- u

- V9 X' r* j; ?# Z/ u: ?* F' r3 @
$ t7 O! z) [" a  W+ }
' o. u2 c% ]; t0 {

3 ]' e$ ]4 p. a! B9 _7 p9 J% |7 l
0 |9 z& j; @& p9 j6 H
/ q9 T% l+ x, v) s复制代码
$ C: U, A3 L/ l5 k/ X# W: [) q# i8 j, u' r" d, W3 ]3 e5 j

6 Z- B* S$ }4 F: B
7 p  d8 u* D/ Q- a" \
2 c, J. h# k7 F0 A5 H, W7 c2 {
+ Z0 B$ p2 Q- B0 X. {# ^: J

) \9 t: g- J( E; q! e& A
: ~2 t- I; ]9 o$ o+ P0 U- L
! V' p& Z+ P/ G2 ?$ b. f. m  `  G% `" x0 m: ]8 o9 u3 K0 W

) |$ W! L) r: L  E; A9 {4 g3 A
8 V3 Z9 _8 j! J# {6 o$ G( ]( L. f1 f7 P$ M) T+ Q( X% T! ^: C( f4 R" A
+ ]4 C3 w; p/ C9 p
3、Kafka
Kafka 是一种提供高吞吐量的分布式发布订阅消息系统,她的特性如下所示:

) O+ O- T% h3 u" \6 F
  • 通过磁盘数据结构提供消息的持久化,这种结构对于即使数据达到 TB+级别的消息,存储也能够保持长时间的稳定。
  • 搞吞吐特性使得 Kafka 即使使用普通的机器硬件,也可以支持每秒数 10W 的消息。
  • 能够通过 Kafka Cluster 和 Consumer Cluster 来 Partition 消息。

    8 n2 [9 E  Q! y) N# x. t

7 X5 n8 M- n# ^* J6 D
Kafka 的目的是提供一个发布订阅解决方案,他可以处理 Consumer 网站中的所有流动数据,在网页浏览,搜索以及用户的一些行为,这些动作是较为关键的因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于 Hadoop 这样的日志数据和离线计算系统,这样的方案是一个解决实时处理较好的一种方案。

6 O) z0 }6 D3 F! I
关于 Kafka 集群的搭建部署和使用,上面已经写了,不会的朋友翻上去再看一下,这里就不赘述了。
4、Storm
Twitter 将 Storm 开源了,这是一个分布式的、容错的实时计算系统,已被贡献到 Apache 基金会,下载地址如下所示:

4 K' s' E9 e$ G/ l  ?
4 ~- ~  H% p5 K# s
, T5 @; b5 e# {3 L! [
http://storm.apache.org/downloads.html
0 [, A* g( U( Y& @6 l- z
3 z& F9 x& n  [7 X( V4 |3 u/ z
# r1 _$ \+ a0 J
# ]; `) p$ E' A& J" N; G2 \" V& _3 ^( e" ?; c/ V
# z* U; |  W; M( X% U+ N' ]
0 F" [$ L) g! Z& a# S! U6 k

$ q1 ~" U& Q' K0 l1 Q# M+ b7 @- f1 m
& q5 e* k8 E# u) }' ]

9 X* k) Q/ L, b4 K; W复制代码- U* {( x4 C* |6 y& n8 q- w1 }

) X4 d6 a; J; Q/ o! n" k4 w
4 {: S* N% D7 h! u1 V8 ~! t/ A
3 p1 B& b+ i  }, ]5 U: A& m5 u$ \& c$ I7 J

8 `6 W: @+ y: \
+ L) T; \# ?6 z/ G4 W5 _
Storm 的主要特点如下:

$ ?. X7 a3 V2 m& C% r
  • 简单的编程模型。类似于 MapReduce 降低了并行批处理复杂性,Storm 降低了进行实时处理的复杂性。
  • 可以使用各种编程语言。你可以在 Storm 之上使用各种编程语言。默认支持 Clojure、Java、Ruby 和 Python。要增加对其他语言的支持,只需实现一个简单的 Storm 通信协议即可。
  • 容错性。Storm 会管理工作进程和节点的故障。
  • 水平扩展。计算是在多个线程、进程和服务器之间并行进行的。
  • 可靠的消息处理。Storm 保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息。
  • 快速。系统的设计保证了消息能得到快速的处理,使用ØMQ 作为其底层消息队列。
  • 本地模式。Storm 有一个本地模式,可以在处理过程中完全模拟 Storm 集群。这让你可以快速进行开发和单元测试。
    9 P8 h4 D! b7 q" _" m3 R

% k" \5 ?  }* m0 F; z: y; }) o# m- a
Storm 集群由一个主节点和多个工作节点组成。主节点运行了一个名为“Nimbus”的守护进程,用于分配代码、布置任务及故障检测。每个工作节 点都运行了一个名为“Supervisor”的守护进程,用于监听工作,开始并终止工作进程。

- _% _8 O. l4 a! f
Nimbus 和 Supervisor 都能快速失败,而且是无 状态的,这样一来它们就变得十分健壮,两者的协调工作是由 Apache 的 ZooKeeper 来完成的。
  K9 \" e5 o+ s* x0 y
Storm 的术语包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。

! L7 E) G  i+ _8 a% S
  • Stream 是被处理的数据。
  • Spout 是数据源。
  • Bolt 处理数据。
  • Task 是运行于 Spout 或 Bolt 中的 线程。
  • Worker 是运行这些线程的进程。
  • Stream Grouping 规定了 Bolt 接收什么东西作为输入数据。数据可以随机分配(术语为 Shuffle),或者根据字段值分配(术语为 Fields),或者广播(术语为 All),或者总是发给一个 Task(术语为 Global),也可以不关心该数据(术语为 None),或者由自定义逻辑来决定(术语为 Direct)。
  • Topology 是由 Stream Grouping 连接起来的 Spout 和 Bolt 节点网络。在 Storm Concepts 页面里对这些术语有更详细的描述。
    + |; R9 B4 F: P8 _$ l2 _3 f- D! w

- s! m+ f: B0 }) U
关于 Storm 集群的搭建部署,博客在下一篇中更新,到时候会将更新地址附在这里,这里就先不对 Storm 集群的搭建部署做过多的赘述了。
5、总结
Kafka 日志消息保存时间总结 Kafka 作为一个高吞吐的消息中间件和传统的消息中间件一个很大的不同点就在于它的日志实际上是以日志的方式默认保存在/kafka-logs 文件夹中的。虽然默认有 7 天清楚的机制,但是在数据量大,而磁盘容量不足的情况下,经常出现无法写入的情况。如何调整 Kafka 的一些默认参数就显得比较关键了。这里笔者整理了一些常见的配置参数供大家参考:

  H# e5 U2 Y0 F2 `+ W1 T
分段策略属性

$ c1 f4 O: J9 y& b
! ?4 B, v/ z4 G( v7 K

/ G4 `& p. M: C6 t, h! L; g
2 c, b% ?. |% {2 S3 a- p! K# g  s
日志刷新策略

! L1 u7 l" \! C/ J; E) M
Kafka 的日志实际上是开始是在缓存中的,然后根据策略定期一批一批写入到日志文件中去,以提高吞吐率。
- p# J" A8 [. o2 m

5 q  i  _: [% b3 S1 p4 E/ Q
, B5 ^2 \  K: u+ B

  V9 g  t9 ?# ?9 ]) b, T! \
日志保存清理策略

+ m4 ^: s& x8 _$ C3 p' X, Q* m% I
3 A/ H7 T* _% v. C$ U& m( ]2 o

5 S: Z! P/ e% |% ^! i3 H
这里特别说明一下,日志的真正清楚时间。当删除的条件满足以后,日志将被“删除”,但是这里的删除其实只是将该日志进行了“delete”标注,文件只是无法被索引到了而已。
8 }! V' b. E0 F
但是文件本身,仍然是存在的,只有当过了 log.segment.delete.delay.ms 这个时间以后,文件才会被真正的从文件系统中删除。
; K/ x. `0 J0 Q3 @

0 M) B7 h: t  x+ G' V
文章写到这里差不多了,比我预计要写得短一些,因为还有一些东西要写出来难免长篇大论,篇幅不允许,想更透彻的掌握 kafka 的同学可以领取我整理的完整版kafka学习笔记,最近要准备面试的同学可以看看我这份kafka高频面试题整理
! M$ g" b: k# i* ^6 W7 \+ c
后面我会把另外两个中间件也分别写文章分析,可以给我点个关注第一时间接到通知

0 Y6 f7 O1 B" ^3 T. s0 T
您需要登录后才可以回帖 登录 | 注册

本版积分规则

返回首页|Archiver|手机版|小黑屋|易陆发现技术论坛 ( 蜀ICP备2026014127号-1 )

GMT+8, 2026-6-12 04:24 , Processed in 0.019369 second(s), 23 queries .

Powered by Discuz! X5.0

© 2001-2026 Discuz! Team.

快速回复 返回顶部 返回列表