找回密码
 注册
查看: 411|回复: 2

Kafka单机模式和集群模式环境搭建

[复制链接]

1

主题

0

回帖

12

积分

管理员

积分
12
QQ
发表于 2022-12-30 09:02:58 | 显示全部楼层 |阅读模式
单节点服务
; |6 g$ m) }- f8 B$ I- z+ L1、下载解压:; X( h/ ]! H' Q" L$ J3 K! ]4 R" X4 L
http://mirrors.tuna.tsinghua.edu ... afka_2.11-2.2.1.tgz2 e. M( W. a" x6 s6 v

* J9 L3 I( r& N2、启动服务2 W& s9 R8 s4 P2 N& G) F! Y' @
需要先启动ZooKeeper服务# v! {/ e6 m) N( W8 ]- h' p

. X, i/ [8 d. i$ zkServer.sh start7 u1 b$ [& f* K) L" H7 ^1 ^5 p
: v6 R7 Q1 \, Y" n4 n) ?% Y
# 如果没有,可以使用单节点的ZooKeeper
4 i8 }6 n! I2 n9 D0 z2 U$ bin/zookeeper-server-start.sh config/zookeeper.properties
1 {$ G. G. ?0 ]: k5 ?% _/ u5 b1 N( \. Q0 h4 H' X- `* R
启动Kafka4 F- L1 Q% l, q

: h% q# v1 M, U4 d( v> bin/kafka-server-start.sh config/server.properties: {, M" Z0 C# i: X, G- O5 B; X/ P

4 U4 ^6 V  P( Z7 v7 L9 l3、创建话题3 c$ i" o" w/ `  k5 O  i7 H

" c2 f9 o$ `& Q0 y" j$ H$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
* W! \. _7 i) l4 ]) ^
  |# l) T! t3 a$ `* }: E# 查看话题
# k' m) k4 I1 f8 y* v, ^2 {& D$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
7 w, X2 B, \# x  w- S# [9 i( G9 p; V& H+ d" ?# A6 g9 k3 t& E
; K: C$ H# s0 C6 _$ ]
4、发送消息
. y8 X9 B9 E! P+ S2 c
% |" C, O1 o& a5 o3 M$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
+ J3 t; T% a" N/ k# aThis is a message4 v; `+ ?* _& A. u
This is another message. l8 g% c( ~- K" M8 [$ T6 |# ^) N

' @0 {+ {8 P2 ~0 w6 d# ^5、接收消息$ _- t7 j' r: |  |) v/ @# U7 U

1 U! b% P' M/ [. m) J! b) o$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
% b3 \- B. x# y$ l4 _) bThis is a message9 x8 d" \  _' b5 ^6 j/ L5 L
This is another message7 q: a0 \: Z7 U/ B2 N0 r  B

7 q; ?7 K# q  ]4 u' E3 Y( U# ?多节点服务, W; z" Q& s; G# t+ w. l# X
1、修改配置
6 @  k3 |* O' p2 H! X  ubroker.id 在集群中需要唯一
0 n/ x+ ~7 c1 M7 t- U
9 u& L1 y* v7 \/ d6 u6 \# S$ cp config/server.properties config/server-1.properties. q% N% a, ~# W& k, s7 ~
$ cp config/server.properties config/server-2.properties3 m1 o( ?! m7 z+ C% F
9 i8 ^6 G. i" ~0 ^3 [1 a
$ cat config/server-1.properties:
) N4 x6 ^1 o* q$ O  a) }4 \% G/ `broker.id=19 @, G0 y. s4 l6 c# H6 O* {
listeners=PLAINTEXT://:9093
- s4 L$ k1 P/ Q8 }- G. y4 Olog.dirs=/tmp/kafka-logs-1* y8 L/ T& ^  }8 w' _$ S  S

( ^) a3 N7 x. c% M7 ], m: K$ L$ config/server-2.properties:
4 s8 i1 d/ B( q* h- ~broker.id=2' v) G% h4 }% y# k0 u: L
listeners=PLAINTEXT://:90940 q  ^" g: F$ J0 ~/ \
log.dirs=/tmp/kafka-logs-2; O/ Y1 R, _+ s# w6 b, J  c/ F7 T

1 w( l/ Y# `) b; @% a% u) k2、创建话题
# q* V8 A5 [, y% E, O3 q' m
6 H& |  E. q1 f> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
; o$ U3 q4 Q: ~, }# Z* x+ t4 A/ o3 Q* [: s8 J; s# b" {  D4 H$ y  F
# 查看话题描述
  c8 y4 Q9 }) m" j% o' S" a> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
0 |2 P# v% k9 ?" I5 G
0 y6 W4 R8 R( |> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic. I" ~9 A$ Z# B* v: }( G& Z6 F
$ V- s0 c+ c) `% x
3、查看进程,一共有6个(我的天)
+ ~  b) |+ M. o( i
9 X, T# M: h# Y8 f0 p& R0 P登录后复制 . j! c# R' H* ]9 M( E% H) z0 J( Z
$ jps
% M: y! C7 D+ I* ?) v* @72513 Kafka             # 3个Kafka组成集群
: J1 ^4 {' g2 f6 ]" z4 J, n72816 Kafka( F+ L- |: y4 E, ~
73081 Kafka
- P5 A5 R1 H: q6 I* T73872 Jps
* f3 U) v/ e2 w* U9 L73347 ConsoleProducer   # 生产者
6 G/ f) c4 T) W, P8 D73609 ConsoleConsumer   # 消费者
4 L8 e4 v( S) u35198
5 F) U: s% _& {( }4 Z  _68590 QuorumPeerMain    # Zookeeper2 c- b/ S7 }* v
: b: K" ~! m8 k; G/ V$ C
7 g+ C9 o$ ?* x5 H

1

主题

0

回帖

12

积分

管理员

积分
12
QQ
 楼主| 发表于 2022-12-30 09:04:33 | 显示全部楼层
2、启动服务
  o4 @. v7 }% L% ^需要先启动ZooKeeper服务
$ n4 J5 a1 f: P! {% E' S$ zkServer.sh start
$ ~3 X) S9 D0 ^  \# 如果没有,可以使用单节点的ZooKeeper, M# @$ m! ^" W: L; @- D1 H5 b
$ bin/zookeeper-server-start.sh config/zookeeper.properties
! ^# Y+ v+ B" P$ {启动Kafka
+ F; h# I# ?/ D, D) t. M3 j> bin/kafka-server-start.sh config/server.properties) c% d+ O  H/ D2 Z8 F0 f6 ~; F
3、创建话题# X2 T8 [5 d  u! Z' o
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
* {+ U" I8 [+ v% O) @# 查看话题
4 w; ?& K$ t7 I: ^, r8 @$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092/ {* c: X0 {  ]  _8 {& K# c* q; p. Y; r
4、发送消息
2 _& z! q) e. k( k( J  }2 g$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test( I9 V+ Q* b6 G9 a( ~
This is a message
( L, j% \7 {% ?# P, DThis is another message
% U6 y( \0 l" q$ {, S5 R5、接收消息
& b; M  J8 p0 N+ A+ |5 w$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
- z4 E  o1 y( }  Q' U2 @This is a message8 G  z" ]& n9 d3 _6 L% D7 j
This is another message% \( v5 r% U; V
多节点服务
. O  F; W$ Q7 N' U# \1、修改配置* @* }1 E' t1 I. u- a/ j1 ^
broker.id 在集群中需要唯一
8 r  K& j; l$ a8 e' ?3 ?$ cp config/server.properties config/server-1.properties
* s' y" t5 v* W+ J1 F3 T8 s" L8 V$ cp config/server.properties config/server-2.properties/ b( _" R4 g9 m% q
$ cat config/server-1.properties:
# ?7 K% o0 V: c, c( }) wbroker.id=1: R3 a' ]9 A- V& Q2 I$ U( Y) X
listeners=PLAINTEXT://:9093; X4 _$ `8 B7 A* A6 `; c0 w( L+ o
log.dirs=/tmp/kafka-logs-1" p/ D" P; _) T: L  f, D. n

+ z* k& O; W5 I: R3 Y, v1 P  e3 M$ config/server-2.properties:& k  R8 A6 ^3 T, G" D
broker.id=2
& ?! t/ E. y3 jlisteners=PLAINTEXT://:9094
/ N1 z. h4 z3 p/ ]! I/ elog.dirs=/tmp/kafka-logs-2) P: h0 R2 D8 I6 i: f4 l
2、创建话题
: A% l; ?' r) d> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic! D5 T+ _8 N1 q! S. k
# 查看话题描述
- @0 N8 g/ ], r4 F. r. |> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test( O6 `6 `& I) J% x
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic: {1 d  H! M$ l9 |- J4 n
3、查看进程,一共有6个(我的天)
- J, C+ n; U( b# n$ jps2 l" t+ o! Q9 T
72513 Kafka             # 3个Kafka组成集群
; E2 H6 b6 A& ]1 A3 h72816 Kafka
! X7 y1 }- X- o& I& k2 H73081 Kafka $ M( z4 G6 N) i. W% A0 t
73872 Jps4 H0 D& D) _1 C+ E; r
73347 ConsoleProducer   # 生产者
. j, i  m1 k& j/ r73609 ConsoleConsumer   # 消费者
. s4 l* O2 S1 j' m351988 Q! @2 j% W8 t# x# @7 t6 @% z
68590 QuorumPeerMain    # Zookeeper

1

主题

0

回帖

12

积分

管理员

积分
12
QQ
 楼主| 发表于 2022-12-30 09:07:15 | 显示全部楼层
在了解完 Kafka 的基本概念之后,我们通过搭建 Kafka 集群来进一步深刻认识一下 Kafka。, G5 f0 ?% l5 N$ X3 `

: f' x. J1 h  C1 {0 n4 {确保安装环境( e( }$ F9 k4 L" G; N, v
安装 Java 环境/ \: B" G; B" L  Y2 {# t9 Y
在安装 Kafka 之前,先确保Linux 环境上是否有 Java 环境,使用 java -version 命令查看 Java 版本,推荐使用Jdk 1.8 ,如果没有安装 Java 环境的话,可以按照这篇文章进行安装(https://www.cnblogs.com/zs-notes/p/8535275.html' E4 L; \4 K* K+ @( `$ H! z, ~
1 Q6 \+ ]# p+ c' F( K) S, d
安装 Zookeeper 环境5 X7 E0 U& {. O# e7 w% I7 k$ g& @
Kafka 的底层使用 Zookeeper 储存元数据,确保一致性,所以安装 Kafka 前需要先安装 Zookeeper,Kafka 的发行版自带了 Zookeeper ,可以直接使用脚本来启动,不过安装一个 Zookeeper 也不费劲
0 z2 o# n" A# P. M' E
& {) F: I. B3 R" rZookeeper 单机搭建8 O5 `* @5 [) U$ _
Zookeeper 单机搭建比较简单,直接从 https://www.apache.org/dyn/closer.cgi/zookeeper/ 官网下载一个稳定版本的 Zookeeper ,这里我使用的是 3.4.10,下载完成后,在 Linux 系统中的 /usr/local 目录下创建 zookeeper 文件夹,使用xftp 工具(xftp 和 xshell 工具都可以在官网 https://www.netsarang.com/zh/xshell/ 申请免费的家庭版)把下载好的 zookeeper 压缩包放到 /usr/local/zookeeper 目录下。
" D- X! J4 j8 a5 l3 c; h) Z( H0 |6 a2 X4 F$ m
如果下载的是一个 tar.gz 包的话,直接使用 tar -zxvf zookeeper-3.4.10.tar.gz解压即可% n! Y3 b% e5 l& K: G5 l0 E

6 V0 h0 v' S% x- n如果下载的是 zip 包的话,还要检查一下 Linux 中是否有 unzip 工具,如果没有的话,使用 yum install unzip 安装 zip 解压工具,完成后使用 unzip zookeeper-3.4.10.zip 解压即可。( A3 i" F8 q! S! z7 {4 G
$ q+ r8 K2 @) p' \
解压完成后,cd 到 /usr/local/zookeeper/zookeeper-3.4.10 ,创建一个 data 文件夹,然后进入到 conf 文件夹下,使用 mv zoo_sample.cfg zoo.cfg 进行重命名操作' L% J: I" V2 ]" A/ Y
% P1 ^0 N9 F! {" M! c6 z* p
然后使用 vi 打开 zoo.cfg ,更改一下dataDir = /usr/local/zookeeper/zookeeper-3.4.10/data ,保存。' k! H1 i' U+ u% U
/ r; F& X' U& m8 q( w* o! h6 L
进入bin目录,启动服务输入命令 ./zkServer.sh start 输出下面内容表示搭建成功! e, c" M) q& p2 ^' w0 }" I

, r  U0 b% p7 K  L+ q! c# C关闭服务输入命令,./zkServer.sh stop8 N* M" n; z3 Y+ O
& l! `4 v" y& c) @' l  m5 P

: E+ b4 w4 E; Q: f! W5 L8 y2 u9 R: Q2 z! C2 o* D) v
使用 ./zkServer.sh status 可以查看状态信息。
: ^0 |" w7 P7 @% [  N  x) Q# Y0 i
5 e, o: W# g' F8 Z' k9 @! xZookeeper 集群搭建7 W3 Q7 G' T# }8 d* a7 ^8 R
准备条件+ z" O+ s2 v: [. [2 Z# Q
准备条件:需要三个服务器,这里我使用了CentOS7 并安装了三个虚拟机,并为各自的虚拟机分配了1GB的内存,在每个 /usr/local/ 下面新建 zookeeper 文件夹,把 zookeeper 的压缩包挪过来,解压,完成后会有 zookeeper-3.4.10 文件夹,进入到文件夹,新建两个文件夹,分别是 data 和 log 文件夹
+ w( ^  x& P& X+ |( Z5 W1 f# i# @
* Q" f, w$ Y  ~0 _* r+ l* T4 r4 b注:上一节单机搭建中已经创建了一个data 文件夹,就不需要重新创建了,直接新建一个 log 文件夹,对另外两个新增的服务需要新建这两个文件夹。* c9 j# u) E; j4 B4 \# R9 F
/ H# C% Q, C% y! y0 B
设置集群
* X& B. h* S* ?% B: F新建完成后,需要编辑 conf/zoo.cfg 文件,三个文件的内容如下
% g: [# [  S. C! Y" t# y8 ~& n) L) p2 E9 }6 {
tickTime=2000
# Z% P7 S5 N9 W2 @2 XinitLimit=107 n( N/ o& J2 E- T! P& ~6 e' f/ ?
syncLimit=5
+ K5 S1 t/ |/ K) FdataDir=/usr/local/zookeeper/zookeeper-3.4.10/data, z6 W& {" }2 C/ j; b+ \
dataLogDir=/usr/local/zookeeper/zookeeper-3.4.10/log
5 ~4 p2 I  h: v/ ]' ]; C; o  GclientPort=121818 N. l+ O& m2 {' t3 J1 W, w+ u. i
server.1=192.168.1.7:12888:138880 r9 Q$ L2 k4 e8 M5 o" W' T
server.2=192.168.1.8:12888:13888
! k! y3 v/ o: \4 @server.3=192.168.1.9:12888:13888
% j1 W( ?7 B) gserver.1 中的这个 1 表示的是服务器的标识也可以是其他数字,表示这是第几号服务器,这个标识要和下面我们配置的 myid 的标识一致可以。
/ L2 s9 A' X7 Z* e
& h2 J4 D$ S* A. H( k, N192.168.1.7:12888:13888 为集群中的 ip 地址,第一个端口表示的是 master 与 slave 之间的通信接口,默认是 2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口,默认是 3888
$ c5 b) x3 w  a" F' S
7 c' p3 [: k: t1 {$ J8 }现在对上面的配置文件进行解释
9 ~% Q+ c) [) k1 q% d6 |, p
/ z7 d8 k9 d8 `% N+ ~' v$ c, ]tickTime: 这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。4 ?/ G$ _% ~4 d1 Z
- Z6 S1 u4 b( R7 B: c* }! l6 k
initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒4 x" p9 D, X5 i$ @

' l$ P; D& P( W) xsyncLimit: 这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒- }& a+ B. A/ Y9 r& s
3 \4 t# X) }, Y& A6 _% T6 Z0 Q
dataDir: 快照日志的存储路径
. C( A6 F' N# o1 e, K4 W" p9 f) a. k0 l
dataLogDir: 事务日志的存储路径,如果不配置这个那么事务日志会默认存储到dataDir指定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事务日志、快照日志太多: Z  u, f9 K2 Y/ k; R

7 {3 ?% z& A8 Y8 N: tclientPort: 这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
1 u0 Y8 q9 f! K' @& R1 y
$ r+ W" d9 a1 _创建 myid 文件1 g& d, `5 p+ _' t$ A4 f
在了解完其配置文件后,现在来创建每个集群节点的 myid ,我们上面说过,这个 myid 就是 server.1 的这个 1 ,类似的,需要为集群中的每个服务都指定标识,使用 echo 命令进行创建
1 b# h" _# g7 l& _5 k+ a4 s) ?, ~* o. ^! g
# server.1
" Z9 @4 Z- M9 x2 decho "1" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid1 W' Z" J# h* Q* e! p2 g5 J3 u" G
# server.2
: {- U- y& d* m( v+ e3 @echo "2" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid7 W7 m2 S3 R) J0 t' j
# server.3
' R7 N6 C. N& ~' S) [  V) ~, N* qecho "3" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid  v9 O( l, M$ `1 U) {
启动服务并测试
. h+ `1 _  X4 T) B配置完成,为每个 zk 服务启动并测试,我在 windows 电脑的测试结果如下
$ d- q( m/ Y5 Z; y5 M
/ }1 ^3 M& Q! l$ o- E1 {7 B! D启动服务(每台都需要执行)( ^" M1 \, w1 P6 |4 _& O- L# W7 B
; @7 i, j  g9 l6 _( j. n- s! I
cd /usr/local/zookeeper/zookeeper-3.4.10/bin+ D! B8 W# D: B0 a, t
./zkServer.sh start7 x- [6 |6 S: h% X
检查服务状态/ o1 S# R/ p4 D! v, s( ^8 Y) r2 F
, m! T0 w. i, Y' H% z9 u1 O
使用 ./zkServer.sh status 命令检查服务状态2 D  f) v$ R4 s# D( T  Q0 a

( H0 `# }' A; Q5 \8 ^2 p192.168.1.7 — follower) F$ w" T+ A4 g/ s# r2 p5 v
  ^5 i; s9 Y; J3 e& l1 U$ N

) b* X* L& w) _" z, Y+ i
1 P: @. M" B$ _. M192.168.1.8 — leader0 z4 z  v! J, \" C3 Q2 w9 P, j- h
4 J4 p; f4 y0 z, x; A, C
  v7 k+ }: R) E$ o0 p( s9 M

* q5 w0 b+ W+ l) L+ `0 w) ^192.168.1.9 — follower' g2 V0 O" S; ]
& d% k7 \$ p+ Z( L0 {
6 r& I9 P/ ?: K1 w0 u1 X
( \0 {  V4 f6 f- @
zk集群一般只有一个leader,多个follower,主一般是相应客户端的读写请求,而从主同步数据,当主挂掉之后就会从follower里投票选举一个leader出来。
- b3 a1 T* h" o6 q7 N+ U- m
8 z" Y$ E) E+ V$ m: vKafka 集群搭建6 ]1 `) Z  l/ H1 i7 r) p4 T8 Y+ p
准备条件3 r  v% ~4 i1 s
搭建好的 Zookeeper 集群
& f, q$ J6 Y# e2 Z/ UKafka 压缩包 (https://www.apache.org/dyn/close ... afka_2.12-2.3.0.tgz
" f% A& S4 e2 }' W在 /usr/local 下新建 kafka 文件夹,然后把下载完成的 tar.gz 包移到 /usr/local/kafka 目录下,使用 tar -zxvf 压缩包 进行解压,解压完成后,进入到 kafka_2.12-2.3.0 目录下,新建 log 文件夹,进入到 config 目录下4 [( T+ [, x6 z# z

5 N) \0 U% l. F0 N3 A) E! T4 f7 c我们可以看到有很多 properties 配置文件,这里主要关注 server.properties 这个文件即可。
$ [& |/ p; }- m: N, p+ m& _! r
% `* [8 H3 P* m  ]+ f, h+ R- ]% X; S4 D2 U; r2 q

. b/ a1 n9 v5 P7 ikafka 启动方式有两种,一种是使用 kafka 自带的 zookeeper 配置文件来启动(可以按照官网来进行启动,并使用单个服务多个节点来模拟集群http://kafka.apache.org/quickstart#quickstart_multibroker),一种是通过使用独立的zk集群来启动,这里推荐使用第二种方式,使用 zk 集群来启动! w; f* @+ A  l. O

9 ]3 ^6 N4 W' m  `' }2 q% f4 J修改配置项
6 L9 H- d1 Q5 O6 D. z1 i需要为每个服务都修改一下配置项,也就是server.properties, 需要更新和添加的内容有. U; e1 j% z; C1 D) A3 p
& G, V! }! ]/ C4 k3 J
broker.id=0 //初始是0,每个 server 的broker.id 都应该设置为不一样的,就和 myid 一样 我的三个服务分别设置的是 1,2,3
8 K4 Q% V0 C0 Tlog.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log
5 `& x! p8 _. p$ `" ?6 b" d. L
6 R/ Q$ V* y1 ~/ R- q#在log.retention.hours=168 下面新增下面三项6 k% C. O& [1 _8 {& R
message.max.byte=5242880" R9 j. A9 T7 V% ^, L
default.replication.factor=2
# x! T3 d8 i# z5 y( xreplica.fetch.max.bytes=5242880
8 g* S. C. P8 f# M/ ~; H' a9 q" }& |) p! ^
#设置zookeeper的连接端口
2 S' a8 O' m2 h; ezookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181. }( F% H; {( \* u  M! F
配置项的含义
2 v, J7 J8 v0 O: Q& K& T' H" o% ?$ n
broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样% t7 P7 Q7 n! j7 K0 o. I" t
port=9092 #当前kafka对外提供服务的端口默认是9092* k8 U1 J' I) F; H8 T; d% r# F: T: `
host.name=192.168.1.7 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
9 h* Z0 a4 I( G) {% cnum.network.threads=3 #这个是borker进行网络处理的线程数
6 D" O& v  ]! V9 L' W+ fnum.io.threads=8 #这个是borker进行I/O处理的线程数
$ M- _8 j$ o/ ]- _; F8 n, qlog.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个! t9 J" o+ R4 C0 s4 z, o2 ?
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能8 I6 X& _' L  J8 m/ ]
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
% \  k+ H5 G5 j7 Tsocket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小& `+ e: \/ P, I- R, l% d) R4 ^; H
num.partitions=1 #默认的分区数,一个topic默认1个分区数
8 K3 i' y5 e2 ]$ |  u) ^/ Elog.retention.hours=168 #默认消息的最大持久化时间,168小时,7天2 X! ]; J1 E9 a
message.max.byte=5242880  #消息保存的最大值5M
% }4 _1 `7 o8 r5 w) J& Ddefault.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务1 M- V: B  j7 c7 _. z# {6 d
replica.fetch.max.bytes=5242880  #取消息的最大直接数3 [" D; x+ u7 h* @0 {& h# |/ J7 N
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
1 ?" T8 W- T! k  {% wlog.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除2 w+ r' F. C7 M' _; m8 d* o! \
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能* S7 _3 r% ~0 V. U
zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 #设置zookeeper的连接端口
$ `0 v8 B& M# X( ]' }启动 Kafka 集群并测试
. I0 i. B1 L; _! V$ f3 x' F启动服务,进入到 /usr/local/kafka/kafka_2.12-2.3.0/bin 目录下+ n/ X  O, n. O: w& H
# 启动后台进程
% o/ l. P# X2 a6 U9 B7 A./kafka-server-start.sh -daemon ../config/server.properties
2 _# m9 b' d0 T' s# m# |) y. W检查服务是否启动9 L, F- j# X6 h& @  G
# 执行命令 jps
, I' A6 V& y5 W2 K8 i6201 QuorumPeerMain' ?1 }/ k2 \) Y5 C
7035 Jps
! l/ d" f* P, F5 o2 q9 P$ {6972 Kafka
5 R3 R/ W! X' U& n, mkafka 已经启动
2 Q' n0 R( e0 A9 [( B创建 Topic 来验证是否创建成功* F9 m) k& {/ p& N# ~0 P9 H* N, ~
# cd .. 往回退一层 到 /usr/local/kafka/kafka_2.12-2.3.0 目录下
2 b: j# R) B" y: N9 `bin/kafka-topics.sh --create --zookeeper 192.168.1.7:2181 --replication-factor 2 --partitions 1 --topic cxuan
0 c8 r# n) q# o; {对上面的解释% P; c. n% A/ V- F% R

5 U+ [; g- j9 ~6 D+ A! o. C–replication-factor 2 复制两份
! t' P5 v. B% F3 B
! j4 B* N2 l3 D) e9 d2 |- m3 H–partitions 1 创建1个分区% N+ _0 i4 D, v8 c6 f) j$ A
! l& ?2 C, y( F, L" h
–topic 创建主题
0 z  ~! ]2 G' @
! e; ^6 D1 y( Q6 Q* R8 U5 I4 l查看我们的主题是否出创建成功4 X3 E9 o8 d5 l+ A4 y

6 B+ u4 ~* Q5 d; C7 Sbin/kafka-topics.sh --list --zookeeper 192.168.1.7:21818 G1 z7 x# d/ X

. B* f! V0 w# K6 {4 P/ w6 z8 b: T& s9 w, X% i2 d
启动一个服务就能把集群启动起来
' t6 T. G, ]6 }. n/ D$ O8 Y3 g' [1 X1 j! E2 w! @
在一台机器上创建一个发布者
6 p0 x" A7 {% S: O; g8 A" d
, l. ^" v# P0 N1 l. P: g# 创建一个broker,发布者" F" Z/ G; J; G
./kafka-console-producer.sh --broker-list 192.168.1.7:9092 --topic cxuantopic
; {7 f' \% X" b' c# I4 m% v在一台服务器上创建一个订阅者
6 }# A. V2 E+ n0 ]! z# R* ^. x. ^( T' p3 [: C4 T& x
# 创建一个consumer, 消费者
. e, k) K+ d+ R' b- ^! k4 Dbin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning
7 A; X- R! @+ I! n: Y注意:这里使用 –zookeeper 的话可能出现 zookeeper is not a recognized option 的错误,这是因为 kafka 版本太高,需要使用 --bootstrap-server 指令# M; z7 A, f& _) K4 Z0 }
& N+ l+ c8 ~/ G: D  t' Q+ a9 g# _
测试结果2 D6 B+ z3 O5 |2 _

* j5 i. E* s: a* U: E7 M发布# |  k+ Q* G" I: ^3 {. T: W

6 S- u) R6 {, j/ O5 b- g, F; ~( r# h7 {" S
& x5 R2 V6 c( }
消费
* e; t: J  D" h2 D6 I; {( O" P" r, I5 D2 ]: d, X! v% ^( ?0 l) d, r

! P$ U, [& q$ p) }$ f5 y7 g  i) f2 H3 h" U' q6 k$ B; ^% t% v$ W6 P
其他命令
  f2 n/ E& X0 z- L8 E+ K显示 topic
- t3 L* e4 W, r7 }
: J( B5 c4 z& L5 ~/ ?' B' }- }bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181
8 k0 \- G* O2 n8 ]0 S5 _9 b
9 ?/ ^4 c* i; I- o9 e4 ]  Q; t# 显示# b6 e5 u4 k; V: \3 H  v4 I! ~
cxuantopic6 V) m6 O! c' ?: v2 _; a
查看 topic 状态  B1 _; v* k4 O
" Z& y5 J, |0 m$ K0 j/ o
bin/kafka-topics.sh --describe --zookeeper 192.168.1.7:2181 --topic cxuantopic
7 B2 k% B! V9 g' g/ f9 ?& x! T9 K+ w( m+ U  U9 p
# 下面是显示的详细信息& Q$ I# X& q5 o- d# i0 @
Topic:cxuantopic PartitionCount:1 ReplicationFactor:2 Configs:6 C+ P3 J7 H5 r+ P" u9 A5 S/ K
Topic: cxuantopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2- X3 }# |4 @: s- j, {1 U7 R
' i) M! F: F& _1 j7 s7 J; c
# 分区为为1  复制因子为2   主题 cxuantopic 的分区为0
) d* ~# m1 C" G# Replicas: 0,1   复制的为1,2
8 N5 `) [* g! t/ y# ^" _' gLeader 负责给定分区的所有读取和写入的节点,每个节点都会通过随机选择成为 leader。2 k4 r; g9 x  I' I+ b) a

9 b* \8 R+ F; x9 g* gReplicas 是为该分区复制日志的节点列表,无论它们是 Leader 还是当前处于活动状态。
4 E  ^( I/ B3 I; a' s2 u& Z$ q) ?; [: D* p2 K) f" M. ]) O
Isr 是同步副本的集合。它是副本列表的子集,当前仍处于活动状态并追随Leader。
% I5 z% j5 |7 N* W
. ~9 o: M3 u; G5 m至此,kafka 集群搭建完毕。
0 {$ H, T5 X+ `- x) z+ U
$ T/ I7 l7 B% Y7 D2 h  q$ \验证多节点接收数据
6 x, g5 T* Z  C6 U; B  B刚刚我们都使用的是 相同的ip 服务,下面使用其他集群中的节点,验证是否能够接受到服务( {! y1 Y- k) T1 o1 A
; s3 A' C; E& G1 u5 d# [. [* E9 E
在另外两个节点上使用
' P2 n$ Z. C9 K9 }! k: e# I5 T) U7 d. D$ o2 V& \
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning/ }9 _% i, r1 m" }# b6 `8 d1 H! D
然后再使用 broker 进行消息发送,经测试三个节点都可以接受到消息。) i! ~" V$ D1 D6 v% D( y: N
8 C# V+ Z, ]. f2 c1 l4 k
配置详解
( i9 I$ W; [- t5 H  q3 y1 K2 M3 {在搭建 Kafka 的时候我们简单介绍了一下 server.properties 中配置的含义,现在我们来详细介绍一下参数的配置和概念
# w- s$ m2 K7 |; i! d$ X# ~, O/ B, `, ^+ T" |# t, e
常规配置
2 ]2 v) M2 h4 W6 u6 P这些参数是 kafka 中最基本的配置& a3 U. x: G% x! b

" u% H- p$ {8 R) J% Y: ^" X4 u  W0 vbroker.id$ g/ L! G9 Y( N  n, x
每个 broker 都需要有一个标识符,使用 broker.id 来表示。它的默认值是 0,它可以被设置成其他任意整数,在集群中需要保证每个节点的 broker.id 都是唯一的。4 }0 }" M1 [( Y" S" s

( o3 z3 Y& U% C+ W1 y4 S. Zport* d1 v; k. ~5 \0 y! J
如果使用配置样本来启动 kafka ,它会监听 9092 端口,修改 port 配置参数可以把它设置成其他任意可用的端口。
! T$ _# R& V  ^& Z# `9 R- r% S/ ~+ N6 a* u% r! h
zookeeper.connect
: m# e+ I2 P  c1 n+ L: j9 o! U用于保存 broker 元数据的地址是通过 zookeeper.connect 来指定。 localhost:2181 表示运行在本地 2181 端口。该配置参数是用逗号分隔的一组 hostname:port/path 列表,每一部分含义如下:2 F4 Z! C5 Z: L& |2 s

. |. \( G, {) V7 H* rhostname 是 zookeeper 服务器的服务名或 IP 地址
# p) S! v' F5 ]. [
+ H' X/ h/ q- Vport 是 zookeeper 连接的端口
0 o* q! Y$ i0 U3 s
# Z" t5 U' y/ U2 e" k/path 是可选的 zookeeper 路径,作为 Kafka 集群的 chroot 环境。如果不指定,默认使用跟路径" ?8 A3 C9 _( d9 t

7 X0 d. ~% M( @8 p. h. Ulog.dirs+ W4 z" n0 }0 r# j6 D- f# t
Kafka 把消息都保存在磁盘上,存放这些日志片段的目录都是通过 log.dirs 来指定的。它是一组用逗号分隔的本地文件系统路径。如果指定了多个路径,那么 broker 会根据 “最少使用” 原则,把同一分区的日志片段保存到同一路径下。要注意,broker 会向拥有最少数目分区的路径新增分区,而不是向拥有最小磁盘空间的路径新增分区。" D. G5 _( @; i1 f6 S
! j/ Z( L4 f& O  Y" s3 }
num.recovery.threads.per.data.dir0 x" @9 e) r' N3 ~- h% f$ F7 x
对于如下 3 种情况,Kafka 会使用可配置的线程池来处理日志片段
  r& V4 n% O5 G; a
( X* P3 ~% M& `- S+ w! O( d服务器正常启动,用于打开每个分区的日志片段;; v, c# s! s& L& Z$ K4 \! M

: A! n' K7 e; y5 _" W服务器崩溃后启动,用于检查和截断每个分区的日志片段;$ m. ]9 ?1 D2 S' ]

# V* z7 {! u8 o服务器正常关闭,用于关闭日志片段) N2 a  z. ?8 W4 R
: U; a1 |: n9 o9 |( V2 A3 p1 }; l
默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到井行操作的目的。特别是对于包含大量分区的服务器来说,一旦发生崩愤,在进行恢复时使用井行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说,如果 num.recovery.threads.per.data.dir 被设为 8,并且 log.dir 指定了 3 个路径,那么总共需要 24 个线程。' r9 i1 x4 I. h  X; t5 l6 N1 y
0 d8 M* G+ h3 Y8 i5 E
auto.create.topics.enable" n5 j: Z# P2 @3 k
默认情况下,Kafka 会在如下 3 种情况下创建主题  t- a9 z+ ]# h2 U( J

$ f6 v- h; a, J当一个生产者开始往主题写入消息时* ]0 J2 `. T) _$ ~8 ?* T' r- k
( a2 r( }4 s7 R! k
当一个消费者开始从主题读取消息时
) Z6 e5 F* [; K8 p, V. `8 }* f( I% T6 m0 E, z$ w) c
当任意一个客户向主题发送元数据请求时+ h, A- y% N% H) t5 y' H0 F

) M( _2 }1 Q0 B* a6 H/ b8 c! o: tdelete.topic.enable
/ a. k* v# {- \( g7 _$ B9 H. L如果你想要删除一个主题,你可以使用主题管理工具。默认情况下,是不允许删除主题的,delete.topic.enable 的默认值是 false 因此你不能随意删除主题。这是对生产环境的合理性保护,但是在开发环境和测试环境,是可以允许你删除主题的,所以,如果你想要删除主题,需要把 delete.topic.enable 设为 true。
  G, V8 I  Y& [* M' ~* s
8 m, ]' v  L4 Y% Q! K主题默认配置
4 c+ S7 t# N/ e* x5 w* kKafka 为新创建的主题提供了很多默认配置参数,下面就来一起认识一下这些参数
# V8 W+ m( p& e! W) N' j# Q
3 m) E# h: F$ B7 Snum.partitions
' j: ?1 |  C) N* N3 \num.partitions 参数指定了新创建的主题需要包含多少个分区。如果启用了主题自动创建功能(该功能是默认启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意,我们可以增加主题分区的个数,但不能减少分区的个数。
+ E5 u5 _5 P  o9 {8 ]& [; }) l) X6 i# ^
default.replication.factor7 o( c# F7 ~& n$ X: `
这个参数比较简单,它表示 kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务default.replication.factor 的默认值为1,这个参数在你启用了主题自动创建功能后有效。. e) `- a* A' V% a9 C9 @8 w" m

7 C+ x3 z$ n0 a, h3 N% X3 Hlog.retention.ms
+ w; A8 A+ M- S$ W" QKafka 通常根据时间来决定数据可以保留多久。默认使用 log.retention.hours 参数来配置时间,默认是 168 个小时,也就是一周。除此之外,还有两个参数 log.retention.minutes 和 log.retentiion.ms 。这三个参数作用是一样的,都是决定消息多久以后被删除,推荐使用 log.retention.ms。
5 {" y" i- z  }8 T7 D! w& M' C9 ]' W8 K; f
log.retention.bytes
* W; Y  B% n. I. E# L7 Q  q6 k# Q, L另一种保留消息的方式是判断消息是否过期。它的值通过参数 log.retention.bytes 来指定,作用在每一个分区上。也就是说,如果有一个包含 8 个分区的主题,并且 log.retention.bytes 被设置为 1GB,那么这个主题最多可以保留 8GB 数据。所以,当主题的分区个数增加时,整个主题可以保留的数据也随之增加。% w: I- m9 j# L6 S/ S% s. B

0 Y5 N0 q0 A* S6 J9 L) xlog.segment.bytes" e2 \- j' v, P; Y
上述的日志都是作用在日志片段上,而不是作用在单个消息上。当消息到达 broker 时,它们被追加到分区的当前日志片段上,当日志片段大小到达 log.segment.bytes 指定上限(默认为 1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就越会频繁的关闭和分配新文件,从而降低磁盘写入的整体效率。0 b! J+ p8 n5 ]5 J
; r; q4 @+ Z! ]% P. a
log.segment.ms; P$ `6 O  ~- \& r8 Z# ]
上面提到日志片段经关闭后需等待过期,那么 log.segment.ms 这个参数就是指定日志多长时间被关闭的参数和,log.segment.ms 和 log.retention.bytes 也不存在互斥问题。日志片段会在大小或时间到达上限时被关闭,就看哪个条件先得到满足。$ X& Q: m7 ]7 g2 \

$ F, T7 [7 x" u  v  H5 D, ^7 Fmessage.max.bytes
$ `; \% O9 Q4 r  h' O) N( Pbroker 通过设置 message.max.bytes 参数来限制单个消息的大小,默认是 1000 000, 也就是 1MB,如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到 broker 返回的错误消息。跟其他与字节相关的配置参数一样,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于 mesage.max.bytes,那么消息的实际大小可以大于这个值. I& Z/ U% M8 t
. |% M1 T8 ?! x- J7 q
这个值对性能有显著的影响。值越大,那么负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求。它还会增加磁盘写入块的大小,从而影响 IO 吞吐量。
您需要登录后才可以回帖 登录 | 注册

本版积分规则

返回首页|Archiver|手机版|小黑屋|易陆发现技术论坛 ( 蜀ICP备2026014127号-1 )

GMT+8, 2026-6-12 04:26 , Processed in 0.017871 second(s), 23 queries .

Powered by Discuz! X5.0

© 2001-2026 Discuz! Team.

快速回复 返回顶部 返回列表