1、软件下载1.1 kakfa 下载
F1 l. W: g* X9 ], F
$ a/ a# N4 R! j. X+ j4 O" T6 x! @ 9 D( X" L, M1 Q6 H6 [. a
1.2 zookeeper 下载(1)因为 kafka 要依赖于 zookeeper 做调度,kafka 中实际自带的有 kafka,但是一般建议使用独立的 zookeeper,方便后续升级及公用。
6 w" F J6 W0 L(2)下载地址: ( A7 V" r& d7 v4 b2 o* S; ~
, ~- m; F5 t$ U
' E3 @- v. d) m# ]8 N. @3 f5 v
- w# j; e) t1 X9 Y5 _1.3 下载说明文件都不大,zk 是 9m 多,kafka 是 50 多兆 6 K6 `- k5 z8 L6 d4 [4 R
. W/ D$ A$ |2 p$ f, N
4 F* k" M* F' g3 R4 X2、 kafka 单机部署及集群部署**说明:**北游在本地弄了三台虚拟机,ip 分别为:
1 ]5 t+ X& t$ A: ~. p& |* x+ g
$ G) @: ~) N8 ?; }
* p1 X2 ~, x5 c7 V8 z- `192.168.85.158192.168.85.168192.168.85.178
& j s( Q' D9 I8 V' R# {$ W. I: ]4 R$ V8 T2 D
; M8 P5 A, _3 e. O/ \9 G- {
- a. m0 x+ ~. |3 k. T
5 G. c8 z% _* r6 b( ~$ `% k& ?# }- C, l y! g. A9 i+ N" j" }3 |
8 f; A {% f4 E& K
# R" {4 {3 F6 h: G
4 H) H2 L" x# X) J e: G. C% f7 u+ u( f. A( g* W3 A
( W0 T. {+ ]8 q0 k
复制代码6 W# B; W/ C1 \' p3 T+ r
( \+ T( m5 x9 z; [5 z/ `! [/ |5 T
4 n0 T$ v6 R4 R q3 Y% d. B/ z) { @
( R* k3 W) R3 \% j( D+ X; ~' t
# F D7 I; ]' n: `) i+ N3 E6 G: x e: u; j
+ ^0 U" Z8 ^0 I
# n, \% U* M$ Z3 F3 I
2.1 单机部署(1)上传 jar 包,就不再新建用户了,直接在 root 账户下执行,将 kafka 和 zookeeper 的 tar 包上传到/root/tools 目录下。
B1 S( k9 b% Z. Y" ^2 K+ x7 v- w(2)解压 8 R& Q) c7 R$ U
- d- W2 ^' Q) y7 l7 h4 ?: ^0 }
8 \# `2 m* M) i, d! p; }/ L[root@ruanjianlaowang158 tools]# tar -zxvf kafka_2.12-2.4.1.tgz [root@ruanjianlaowang158 tools]# tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
# @: u" A5 C! p
; M, f& `3 g+ x- T' z: b; P7 {4 v0 w( s( H+ n$ I
2 \+ @+ G& s# o4 p I5 V- A
2 ]# F+ E4 K. u9 }
0 {( e {0 y/ n8 t# D9 { v0 G+ M& p& J5 G1 k7 E! _
3 ` ^; o# Q; m. z4 i
# A. \( ? q& P' b3 V* H2 h9 e- R
/ A' ~$ W" s: F9 N$ U( R
+ c1 I; Q' n* T复制代码5 _4 \! q/ k* n* I$ V8 E
7 d: }8 O9 b* S5 s, H+ q
& ?7 C# }/ x- b, d6 k. {
5 H w; ]. w w b6 ]: T
1 V2 I+ F, b4 n# X' o* \
& H$ j, m; F* v0 i4 E* N
3 X% U$ v, w4 u( @1 a6 B$ J$ e1 w3 f
(3)配置 zookeeper 及启动
! m2 t) l7 Y0 x6 Y7 T4 Z& I. R; l/ |+ X z' n5 S! _
5 c8 ^8 r. W/ H' B) ^+ _[root@ruanjianlaowang158 apache-zookeeper-3.5.7-bin]# cd /root/tools/apache-zookeeper-3.5.7-bin#北游,首先创建个空文件夹,在接下来的配置文件中配置[root@ruanjianlaowang158 apache-zookeeper-3.5.7-bin]# mkdir data[root@ruanjianlaowang158 conf]# cd /root/tools/apache-zookeeper-3.5.7-bin/conf[root@ruanjianlaowang158 conf]# cp zoo_sample.cfg zoo.cfg [root@ruanjianlaowang158 conf]# vi zoo.cfg #单机只改一个值,保存退出。#dataDir=/tmp/zookeeperdataDir=/root/tools/apache-zookeeper-3.5.7-bin/data) @: _+ X" b: O' y
#启动zookeeper[root@ruanjianlaowang158 bin]# cd /root/tools/apache-zookeeper-3.5.7-bin/bin[root@ruanjianlaowang158 bin]# ./zkServer.sh start
! r8 ]9 a: X, a' ?" {+ y1 A* ^' B# I4 d( y
' x# [: K. a* `# g; m
, n+ {9 f9 g. ~4 z9 J
6 _" d9 l* s/ p: F& `( M/ q6 G. E8 \( L: Z* J" W
9 q: B- _/ p( U5 S- ]
" A1 V+ _& J1 x# E- V) g
% K5 O% j5 B2 V3 ^1 ^
: X) g# l$ Z! g! ?
5 a1 V' B8 o) _# l6 { O3 R) E复制代码 X: j$ T9 j3 U2 A1 |
& ~$ T, W7 G3 X: E9 @. ]6 m
/ S7 m8 p/ W$ g# `/ X( j/ ^: U9 L, v5 r
/ G/ ? A% P3 N& v: R
7 C$ B% {+ w0 Y b+ L& Q; W. N( M+ g: j9 F& Q
- N8 v8 H5 T: s: a2 f+ o2 {+ J
, h3 i) Z* v+ x3 [
1 ~+ E+ u6 T! {; v
0 q9 h0 J6 p$ e+ a+ p
; }+ C% Y) B3 `9 f
# t# L2 i; p' |# R/ g4 ^, `% m# e: X' q# k: h" H% ]
& o- w F9 m- U0 W7 ?) E9 u! J
2 v. e9 t# _) ~# |6 l
# T/ g, K; s8 g% K
! r# f/ j3 C' f' J# n$ S
7 x/ v1 {5 d* T- Q7 `# U& m) Y" v8 M1 N' m: s( u; ^
(4)配置 kafka 及启动 7 w6 `5 P; o& V# N7 v- u8 Y
! s" L# ?! q. B8 c Y7 b k2 M
[root@ruanjianlaowang158 kafka_2.12-2.4.1]# cd /root/tools/kafka_2.12-2.4.1
" _: ~" n5 x, P1 M1 |9 ?) r0 E#北游,新建个空文件夹[root@ruanjianlaowang158 kafka_2.12-2.4.1]# mkdir data
" w% ~- Z: x6 P {$ p#北游,更改配置文件[root@ruanjianlaowang158 config]# cd /root/tools/kafka_2.12-2.4.1/config[root@ruanjianlaowang158 config]# vi server.properties
+ L" P- Q- l7 P#需要改3个值#log.dirs=/tmp/kafka-logslog.dirs=/root/tools/kafka_2.12-2.4.1/data#listeners = PLAINTEXT://your.host.name:9092listeners=PLAINTEXT://192.168.85.158:9092#zookeeper.connect=localhost:2181zookeeper.connect=192.168.85.158:2181
" E. y6 S, v# {6 d) u#启动kafka[root@ruanjianlaowang158 bin]# cd /root/tools/kafka_2.12-2.4.1/bin[root@ruanjianlaowang158 bin]# ./zookeeper-server-start.sh ../config/server.properties &
! b0 v" m, O8 z0 p6 g4 s; i
4 m3 [1 p0 r5 F: B4 o& ]2 D5 @+ s: {( C3 P7 r7 Z
. ~: W: Z. Z0 d1 |1 h: c7 t
& g1 X- G& Q4 z! A9 p9 ?* Q* o$ v0 _" ]
1 b z3 e' ]9 k: F J7 N
( L% i8 t. y; R
0 W- i( Y; U& z. \ W) L: d; x) L4 L; I9 w r5 \$ I
$ _( f* C2 m1 ^
复制代码
; o/ c- F% F4 E3 a2 e1 |% T: t2 M$ a6 @5 S
8 w8 f; V$ X8 l
1 u$ Y8 ?( J2 s0 c! P4 ^
3 ~' J4 h. V V, U/ r/ H" M
5 ^1 p5 k+ M# n( @+ I" {8 u3 y* P( _
/ P* X5 R# h- E; n
7 x7 ] G- ]0 R5 V L7 {* X
" k; b! X$ b6 Q4 z) H( y# X& {* }
. x& Z, s% i' {: C* z9 k. @- C
3 h$ o7 N& M( c2 G* V _& j
$ H+ t- c' D+ e5 _9 O; X
# Z$ N1 w) v7 R3 x' \/ n
$ i8 s1 u# i9 o) [! D8 R }
5 l& B8 k/ y5 C: n; R: C# C+ Y& _+ e) Z) p& ] }2 M
5 E* ]6 ]1 i3 k' [
: F4 [4 O2 h9 L5 x4 X
' `) ?, w2 O! {6 d9 G( A6 P2 ?
! Y5 Q" B1 L8 }/ m- R1 d
) X+ Q- b0 F: D& f8 `4 m. e, I2 M2 e" E+ V
+ @- R$ I5 a; ~; y% C8 ^
6 X7 Y) r$ T0 l% q7 Y& q
0 Z9 ]3 @9 N9 w P* C
启动完毕,单机验证就不验证了,直接在集群中进行验证。 2.2 集群部署(1)集群方式,首先把上面的单机模式,再在 192.168.85.168 和 192.168.85.178 服务器上先解压配置一遍。 : r7 W ^5 H/ l3 U3 ^/ A; U
(2)zookeeper 是还是更改 zoo.cfg
U/ Z: _0 n( V# J% p8 c158,168,178 三台服务器一样: 7 m. B4 X+ v0 P) L0 R( }
3 z; K u8 k! ^8 s3 Z2 a0 ^
. F6 {$ f# w R; K D: s$ C[root@ruanjianlaowang158 conf]# cd /root/tools/apache-zookeeper-3.5.7-bin/conf
2 O X' }, l6 I7 i; R% y6 l( k[root@ruanjianlaowang158 conf]# vi zoo.cfg#其他不变,最后面新加,三行,三台服务器配置一样,北游server.1=192.168.85.158:2888:3888server.2=192.168.85.168:2888:3888server.3=192.168.85.178:2888:3888. H. X8 |4 _% ~ k( w" I0 K7 z
158服务器执行:echo "1" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid168服务器执行:echo "2" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid178服务器执行:echo "3" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid
8 O6 A5 @ p" h4 R! e# v& M) v8 w6 N I- U G: B+ F
- y9 L) i) t5 I7 H2 R( J( _* v
+ s" \( d5 C3 Z5 |! S- z& P+ M5 Y# M9 \% |6 X" ~ l
8 U+ {1 i! F; ~5 N, a$ N* j. p+ S# h8 B' D' \/ [- @
8 p+ J' e$ a1 k6 a% B& P3 D/ n3 m( N/ n' Y4 D) s
. X5 d$ L4 a' z' b
& v' }" {9 u! r1 B# z: p
复制代码
: `' Y* N9 @0 E0 d. B
1 Q" t9 J) W7 T, D4 A1 \! G1 [! A6 b. g: I: y5 |7 l3 f
! R9 a! w, |9 F% Q+ V+ p
6 v$ o/ z/ ^# u; T
/ |! z7 w1 X8 u% ^
P0 p8 z; N! Q6 ~) m
9 [4 H) Z5 F2 m5 U! e
8 f( o6 j: @! S$ O- d+ O* `
4 P( {& a6 ?. G8 n3 R. |& X9 U2 ]4 t4 ~$ P; F
6 I% J, `* l, @
1 g2 t$ L* g$ V- O( {' Z
/ U/ ^0 }) D/ ?5 z2 o
8 {/ v4 c7 C1 i- g, [' |
6 z% g1 ~) X$ V% c# s9 h, F9 q+ k; {1 p. ^
" B, u# L) k4 {; g4 Y0 U; J
- R* d- U4 {! Q& ?3 P
/ A1 D! s) T5 Z6 k. G
(3)kafka 集群配置
7 ~( `9 x5 a( R* ?: [. A* a7 m0 c$ A0 S
# [/ S: L+ P9 c$ W5 K[root@ruanjianlaowang158 config]# cd /root/tools/kafka_2.12-2.4.1/config[root@ruanjianlaowang158 config]# vi server.properties #broker.id 三台服务器不一样,158服务器设置为1,168服务器设置为2,178服务器设置为3 0 r; n0 _0 c6 \3 ?% T; ~
broker.id=1#三个服务器配置一样zookeeper.connect=192.168.85.158:2181,192.168.85.168:2181,192.168.85.178:2181
& t) P q0 n: X' z1 L! l: C
# R5 b& n/ a" @6 }. ?/ ~0 _4 V8 W
( _+ g% ?/ t, k+ J) W' [
3 P8 D; p2 \! P! F9 M P- N) {5 N0 K$ O K
: J+ a) ?" j$ ?% K! z- a( E
. D2 ?* y" j) E! t4 p6 V9 G. [
0 E' `, L' M" Y# [, z J# P" |& c0 U, g5 b; D; Q6 @6 ~
& j. w" [ i" s! R1 z- d+ M+ ^
7 \ h: }( Q( R- `: p3 `8 g复制代码* o; H0 ^4 w# v$ _
* e9 B4 e6 ~# b7 z2 Z. i4 V# q. G, Q! u* z# @
0 r2 I- F; j- i# S6 S- m8 I% B" o7 ]/ t
1 [0 Q7 w) K3 F/ x4 P
# J z2 f2 E8 l& U. b( C4 x! Q8 e" O( R' k0 D5 t
- |/ h* A- j+ r/ H
5 X, ~ M. m" A
+ h2 w0 W; k( E' i" n8 |) b/ C. F# z# H+ n/ \
( a8 k: C9 N, o$ L J
Kafka 常用 Broker 配置说明: % e3 g+ y) l5 H* S# n& n* Q
: U$ h* N# x, Y+ x. H0 g5 |
5 [0 v: b6 }' P ?- d2 Q, |7 q9 U1 U9 R1 P# t: |7 b& Y
192.168.85.168:2181,192.168.85.178:2181 | ZooKeeper 服务器地址,多台用","间隔 | , j7 r4 G8 N- Z n
(4)集群启动 8 o2 t2 t2 _5 q- o# M, j, C
启动方式跟单机一样:
' ^. Z$ |% M6 a& |) X' B
/ Y, I {# G s; U) t- c6 n
|) H! `7 w! J#启动zookeeper[root@ruanjianlaowang158 bin]# cd /root/tools/apache-zookeeper-3.5.7-bin/bin[root@ruanjianlaowang158 bin]# ./zkServer.sh start0 ?) M6 `- y+ \ ?3 g0 p5 O
#启动kafka[root@ruanjianlaowang158 bin]# cd /root/tools/kafka_2.12-2.4.1/bin[root@ruanjianlaowang158 bin]# ./zookeeper-server-start.sh ../config/server.properties &' c& d s6 ~( F% r$ U
0 h# m" ?& c- Q% e3 x& b; u' O
# g9 N9 }+ v% j5 u' L
+ O$ @9 j( z) x1 u5 [. K
8 I4 @. W( @* x/ u& o2 M
# A2 Q. v8 U S
& D6 h( X. u R1 f5 g' d3 h+ ]" F, c6 I* i
/ M! ^" O* [$ A% c2 H% y }( }, p' p ? G6 U* {+ w' D
. ^9 S: R$ |3 D3 @) [0 e: ]复制代码
$ X/ y2 y3 Y/ T7 e
4 A+ l; d6 C; }" @, Y" g! T
0 i$ m6 O, O* ]$ x! O' R
`$ N8 J, _% R9 L% Q
0 ?9 \8 i4 v+ ?$ A; E8 i* d6 \6 V- v0 n0 o; q
' X: W! M+ q7 ~' x: A0 m, |, C
2 V4 n- u$ u8 H; g$ P9 d/ j8 {1 f5 t& X& Y
4 x& m+ s: o4 z% i. `
' | Y, N+ f2 @7 r! Q. Y S; S' U0 X6 i9 G) M
0 X, p2 H; v! a% t/ }(5)注意点 * K {; p8 d2 z8 X8 v# s: Q
- Y; b$ j3 U, V B- C, w" o6 h
: L: ]; s7 a5 V- ]2 b A集群启动的时候,单机那台服务器(158)可能会报:Kafka:Configured broker.id 2 doesn't match stored broker.id 0 in meta.properties.方案:在158服务器data中有个文件:meta.properties,文件中的broker.id也需要修改成与server.properties中的broker.id一样,所以造成了这个问题。) g/ x! K" S! K( V3 d$ P, E2 U
0 Z0 C! \& I% V" ~4 v" p: w3 S5 D' ]1 P# x' c
! ?3 Q" ^! Q2 M- \, ]
, h! h% B! s0 ~% l
Z7 x0 U3 X3 k9 H8 E( v2 F
5 l' D' r' u* c/ k
7 I8 i( r, h! E; C4 b/ a9 O# R
( ^" @) g$ w5 ?6 H, l- |5 ?: Z
) x8 c& j# w# i8 Q( Y3 I# v! N9 J' t( V# u# N5 N
! i! o; v& C \8 u
复制代码
& d6 H& w X( b& d6 U3 u& b
8 B+ Q; c. M0 T3 s A& @7 z- ?8 y( d, H
: Q; z3 x z' P* Q0 n1 h# B7 a. p; x$ S# L2 r( I- t. S
( J1 B7 e1 l' w' \. n! v! j
* n! r% S: r- M; I9 z
: w) h% @5 J8 ~) @(6)创建个 topic,后面 springboot 项目测试使用。 9 V; E! U: E x$ K$ q7 {: b
; }( D0 U3 V- I4 D, l7 w u
2 ?2 h I, W8 P8 m! I' a[root@ruanjianlaowang158 bin]# cd /root/tools/kafka_2.12-2.4.1/bin[root@ruanjianlaowang158 bin]# ./kafka-topics.sh --create --zookeeper 192.168.85.158:2181,192.168.85.168:2181,192.168.85.178:2181 --replication-factor 3 --partitions 5 --topic aaaa' m4 _5 b6 I: j! w1 L/ \& ?; H
4 O( e4 T+ }5 W
0 B: P& L: F* S0 v) Q* ]! m+ B6 b" k- c8 D: S% A# X9 ^
4 P `& g5 m" A. m; `' P3 b9 o) h+ B% k8 s
' ~: s2 |8 o. B% H* U& i
* ]6 b$ h% v6 o. n. }# l* i: l9 i- Z4 V( \9 u& b3 l
3 A( i _; W8 H* R
( c1 ]; b& B, x: l+ ]8 t* Y" O$ Q$ |1 H
复制代码. d7 X9 W) u: t4 ?
* O" k& b; s& J+ A- `
S# Q& C# r1 b8 x
! X6 f) b# o9 E- b- H+ C+ X9 B6 d8 @0 w. c( k- r
$ f- ^4 ?- n8 g+ `( H
0 p6 w: P7 B7 G3、结合 springboot 项目3.1 pom 文件
H* p, J1 N! c/ N8 a' ]' ]
; [$ i& A, W" d# G<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.0.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.itany</groupId> <artifactId>kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka</name> <description>Demo project for Spring Boot</description>
( b) A# n, D: Q5 G+ q% P <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>6 K* F$ k& g, f2 a/ B( W* H
</project>
1 z/ }' x1 F4 f& |2 h1 D& K% _/ Z: R# V4 w e6 ` n
6 k ~. @: V* {; N
( I3 L# i' k4 g$ K7 [/ r- F- u3 Z3 ?% [5 K
r$ `6 {7 }1 c5 v, E3 ^2 _
9 K. \+ @2 T/ Y* a* b+ {+ B- Y9 k1 P& |* v1 e0 J
" E2 E4 S- n! M) m
: B" U+ G. u8 ]. G. H4 U: k6 ]$ j. `) [0 ]
) R$ G7 X$ l6 W% a! `5 z& t: F
复制代码
0 q2 b! v5 S" v) Z" m' y! U# X& Y2 H
$ |4 s$ n8 ]* o9 e$ [, f
0 z7 V9 o# |' t4 q
) f0 X# h+ m2 c- C; ^1 R% d
7 f" e# D! S* I* g X& T0 x# b3 p: b3 g4 V, a1 T! z' s
. Z) E# q1 V# ^& F. k8 {9 O* t" l- B, \- {4 r6 R" j' }2 i
5 @7 s$ T) E) ^3 C( K c9 c1 I) P" g2 V
9 Z& I8 T" x# y Q
$ l+ p" C. M" Z+ i2 O! ?/ K% ~# u& U1 M0 b9 p
5 L& [0 x& `; o1 C8 A) m
9 p3 g7 h6 y- ]* R
8 H7 H, z: e% k# f; m% Y7 {
% D! ]) r! ~$ U! X9 _* p2 N! S. ]. p
# i" s# {4 R1 F# M7 _
! s2 F% l! m3 g8 S; U+ i
" G( F8 d8 F/ t; e# C7 B$ {: s. v: n" x: @3 _0 T- j# n
" a9 v+ J" S Z! a" J9 X; b8 t7 A( ^0 n
5 w; c! |; P% X# A; }1 @
5 L2 `7 |% Q0 v4 A3 R: h
/ o, }% g5 b7 D; H( m: W! R
0 ^ Y- R2 u. w8 [
* g }/ S4 W5 Q( X& | N. k d3 W$ J3 a
9 @2 f$ Z' G7 r4 h- k; b. w
+ U _. K; I2 r0 A: e G: F9 B$ Z6 u: N/ L4 v# j% ]
! B+ m0 N2 E1 \
. q3 A+ h/ m; t8 O2 n; p
( N5 a$ s4 k, [5 W# \$ j
! |2 o$ C5 F' p) w* s5 \
& \; `3 ~0 y, n* T# H C% n& L5 l# F
( k% A" J; p; c' I+ P0 g' ^; _3 E" c5 A$ [; L O# `
& H$ G& {( B+ H9 P% @$ c+ J3 k1 r& R- C
3 y ~0 N: L3 c n3 g7 A; V2 i1 r) f+ S7 x; J. q% W
; v ^# ]7 _% A
& g: H" a0 d/ ^* M* j
说明:
B$ w* p/ K9 x5 }主要就两个 gav,一个是 spring-boot-starter-web,启动 web 服务使用;一个是 spring-kafka,这个是 springboot 集成额 kafka 核心包。 3.2 application.yml
: X$ K/ V! ?& d1 @7 S" }
9 O' r8 O' N6 D' zspring: kafka: # 北游,kafka集群服务器地址 bootstrap-servers: 192.168.85.158:9092,192.168.85.168:9092,192.168.85.178:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: test key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# c3 p- G5 X) G# J
* z7 x( k' a! g0 G* G3 I
! `4 B7 u0 H; w* N0 F+ q K
! [& G/ k7 b0 C: l+ O+ {! p2 @1 C1 q# u, M/ K. K" \
5 }% E2 f0 K) A Y, q/ b3 ]' H6 V, f) z$ c' J& N5 w- C
! |% ^3 Z; m. S- E8 q2 M
: j: |" w) F# ?0 F8 f9 M6 T% l5 W |% V H9 L$ F
# D6 u$ f6 d0 z0 j M9 {6 W
复制代码
4 j8 N# G4 H, P, z1 O
' {, g( Z# }$ @, I8 [! y) r2 k: S% \
8 S& K: `, o3 Q( d4 q9 ]2 a; f5 _; m- o3 Z; m9 A# g5 F* m
0 Q8 Z6 o7 c( z1 u" k9 }- z8 w$ |6 l/ ~+ p% m
$ h& l9 @; n4 K& d5 K3 t) X0 W
6 c9 `& \( G- s0 B! c, v8 u6 f
# Q' k+ p" p$ ~3 W5 R
: [" v0 T: d* }; c% w
% N: L/ m7 X) R* a$ O( S
0 P4 {: j/ C: [$ }$ q5 T6 |$ l# ^) @- e+ }0 W
5 n0 ?' |7 y# l/ f
' n1 q% E9 s/ r5 Q* `& m* P9 s3.3 producer(消息生产者)* x8 Q; f/ b! l7 V
: _3 p/ b% C" b. u! m( E( E
@RestControllerpublic class KafkaProducer { @Autowired private KafkaTemplate template; //北游,topic使用上测试创建的aaaa @RequestMapping("/sendMsg") public String sendMsg(String topic, String message){ template.send(topic,message); return "success"; }}
2 e3 A c5 [* c9 [( c! S+ G K% A+ g- j
. x9 X( O! x3 \2 O K
$ x) j/ M% F) Y) {) _* i2 T
! B6 @. e5 {. a% w' G( V/ f! D, Q7 }- Q6 ^/ e
4 X- Y( O* f$ H; N- g# N% U( L
) y) F, P' l3 G, B9 N- B, X2 g
I: ^1 E+ R& g- O2 B: Q" V" d2 i; B& d
" T4 ?0 Z% M4 I6 H8 s, b/ R! H复制代码4 \! q: C$ @/ B3 w# c7 X+ n
; k! ]9 k' @0 P; A
5 G6 ]: e/ B$ r
. M$ T8 Z+ e% k( A
; a1 F: _3 m- M0 H
; A: P. T# E1 j/ M
6 R% a' H0 D5 f" ~
4 i$ I" Y- ?: X! g+ l
( U+ A3 ^8 M' Z/ W
0 P U% B3 r) S% }- ]6 N. z
) k( D w' _- ^ D/ \
1 D$ U" M8 Y2 b/ S9 ~; L
8 S v+ }+ k% i- h/ F
9 W1 j) H6 K3 T% [4 ]9 b/ a5 Q6 P7 f# d$ a1 G5 P
; Y4 a7 o+ E$ z
3.4 consumer(消费者)8 \. \2 q4 ?+ t5 G
5 F; S0 B p$ @$ K+ k$ `@Componentpublic class KafkaConsumer { //北游,这里是监控aaaa这个topic,直接打印到idea中,北游 @KafkaListener(topics = {"aaaa"}) public void listen(ConsumerRecord record){ System.out.println(record.topic()+":"+record.value()); }}- ^- Z; {2 {0 } |( O! }
. Z: D2 w$ V# c+ P8 _4 t: g/ }6 I5 i
7 D" w2 t' n; }! ~
2 k% g% k8 Q( y' t- [* T
; c/ N3 _+ g" ^- b5 s8 i, e1 w# Y" p5 {) H
9 i! d+ Y1 J- [# H3 q/ g$ e! U* c/ P& o, x! \; ~4 F
1 J( R3 s& L. r2 v# W+ d) m1 D
* q' K. H1 N- h* A. O4 w4 C- ^: M- A/ J$ H& m/ n N; i
复制代码4 T! O% f% E/ ^: H0 [ U' O
+ o- ~1 K/ I/ \7 Q) Q, M& p% u N: u! q% @& p: U0 I
6 v; C! P. ?$ P, T
4 K, v5 `0 E+ L; t7 X$ Y' K/ Q/ X5 y( S/ J% d, j8 W+ Q$ Z9 c
6 [; }" ?$ Q( D3 {' J6 K$ V
1 R0 U& s" a1 j0 J8 l
A/ [% [; W/ E% [
1 f1 i; z8 K+ E5 i0 q2 `* P5 N
% F2 l' U6 q: {/ G
( s7 P2 a" v5 Q$ p' P7 e6 ?; V8 L! M1 q$ q( A1 z/ m
3.5 验证结果(1)浏览器上输入
( ^9 ]) E! |9 |3 E8 h
% \/ Y+ k- Z. e7 s+ K0 S
9 b o+ {( E2 k) Ohttp://localhost:8080/sendMsg?topic=aaaa&message=bbbb
: s4 {' e0 C0 q$ l, w5 p( C1 B5 u! C* ?" ?
) O. U/ c9 s+ H
' z$ g. d7 O# J; [( D# \5 f0 [% k( d4 T( X5 |) {9 |. n! P( m
. o$ J+ V! G' }
! O+ h: O, Y7 r$ Q5 O3 @2 D) Q5 M8 i% E! @5 I
& s2 w& R/ G( c( s, {; I% B! W F6 o7 R6 n
% I$ h/ P) m3 L' V复制代码
}; v- F# i9 E: q; L0 X3 t& f! D! R7 ^. ]. { W9 a
1 D! z; Z& x) T* {# C: n: W5 ]$ Q7 i
& k& r" o {- M" `. H4 M& {$ I$ W: ]
+ Q+ Z/ U& o$ w# [9 W3 H9 W
, B- u! K4 `' z" L! r o8 {) G
(2)北游的 idea 控制台打印信息 & a$ G9 L& L! j2 y* a
3 b: o$ b; {7 r4 |* _% Z
- {! @: q! a) ^; d3 l
6 e- `- {$ R8 f9 W4 g$ C! D
1 H$ w- b. w6 q8 w! t5 V. W" ]" e 二、Kafka 副本机制1、什么是副本机制:通常是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝 2、副本机制的好处:2.1 提供数据冗余系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性 2.2 提供高伸缩性支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高读操作吞吐量 2.3 改善数据局部性允许将数据放入与用户地理位置相近的地方,从而降低系统延时。 3、kafka 的副本(1)、 本质就是一个只能追加写消息的日志文件 6 e. v4 H$ ^3 j+ o" w
(2)、同一个分区下的所有副本保存有相同的消息序列 + m6 S4 N/ l- v8 x& E
(3)、副本分散保存在不同的 Broker 上,从而能够对抗部分 Broker 宕机带来的数据不可用(Kafka 是有若干主题概,每个主题可进一步划分成若干个分区。每个分区配置有若干个副本)
% F' J5 h) p; G% y) B. a如下:有 3 台 Broker 的 Kafka 集群上的副本分布情况 ( p+ g0 B" } V/ y T% g

) D% h% q( c. _. e3 G( j7 o: m . S& O6 E$ {6 `
4、kafka 如何保证同一个分区下的所有副本保存有相同的消息序列:基于领导者(Leader-based)的副本机制
& y5 d+ Q3 `' z9 T工作原理如图: 3 V: g8 @. m0 u1 E) N
) D# A% R4 n+ u: n; q
2 }$ L$ O: h) f: g2 j* Z: K5 w
1 b" R4 q: r2 P" R9 K
(1)、Kafka 中分成两类副本:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。 4 l0 d- D7 E( c4 Z: l+ T' Y
(2)、Kafka 中,追随者副本是不对外提供服务的。追随者副本不处理客户端请求,它唯一的任务就是从领导者副本,所有的读写请求都必须发往领导者副本所在的 Broker,由该 Broker 负责处理。(因此目前 kafka 只能享受到副本机制带来的第 1 个好处,也就是提供数据冗余实现高可用性和高持久性)
6 R$ e+ \- j8 Z4 {1 Y! O8 x(3)、领导者副本所在的 Broker 宕机时,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。 5、kafka 追随者副本到底在什么条件下才算与 Leader 同步Kafka 引入了 In-sync Replicas,也就是所谓的 ISR 副本集合。ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的 6、kafka In-sync Replicas(ISR)(1)、ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本 ! ]7 Z- e$ q4 S4 B0 I
(2)、通过 Broker 端 replica.lag.time.max.ms 参数(Follower 副本能够落后 Leader 副本的最长时间间隔)值来控制哪个追随者副本与 Leader 同步?只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。 ) s+ Q% N" f2 `4 ~. a8 y4 n
(3)、ISR 是一个动态调整的集合,而非静态不变的。
0 o: y$ f7 U. `某个追随者副本从领导者副本中拉取数据的过程持续慢于 Leader 副本的消息写入速度,那么在 replica.lag.time.max.ms 时间后,此 Follower 副本就会被认为是与 Leader 副本不同步的,因此不能再放入 ISR 中。此时,Kafka 会自动收缩 ISR 集合,将该副本“踢出”ISR。
) M4 u! D3 S8 I$ ~倘若该副本后面慢慢地追上了 Leader 的进度,那么它是能够重新被加回 ISR 的。 % `. V) ^" l+ {, j6 P
(4)、ISR 集合为空则 leader 副本也挂了,这个分区就不可用了,producer 也无法向这个分区发送任何消息了。(反之 leader 副本挂了可以从 ISR 集合中选举 leader 副本) 7、kafka leader 副本所在 broker 挂了,leader 副本如何选举(1)、ISR 不为空,从 ISR 中选举 5 T& p3 }# H" u: ]: x+ b
(2)、ISR 为空,Kafka 也可以从不在 ISR 中的存活副本中选举,这个过程称为 Unclean 领导者选举,通过 Broker 端参数unclean.leader.election.enable控制是否允许 Unclean 领导者选举。
7 J5 C: r4 T6 N0 G( f4 V/ t开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。
3 F# Q" N' q- [. s- B( a/ a一个分布式系统通常只能同时满足一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)中的两个。显然,在这个问题上,Kafka 赋予你选择 C 或 A 的权利。 0 g+ P. }# F6 Y+ U" H
强烈建议不要开启 unclean leader election,毕竟我们还可以通过其他的方式来提升高可用性。如果为了这点儿高可用性的改善,牺牲了数据一致性,那就非常不值当了。
9 ~0 N: ]/ p5 n; e' zps1:leader 副本的选举也可以理解为分区 leader 的选举
9 @+ s% k6 }4 `) J- kps2:broker 的 leader 选举与分区 leader 的选举不同,
! Q; P( P0 z9 g# n5 t8 MKafka 的 Leader 选举是通过在 zookeeper 上创建/controller 临时节点来实现 leader 选举,并在该节点中写入当前 broker 的信息 1 ~3 n: ^3 _& m8 P& r+ @' k: `
0 }) Y& C# E7 H6 o9 A2 _* c; O* I' I3 J% Q/ ^4 v3 `
{“version”:1,”brokerid”:1,”timestamp”:”1512018424988”}
0 V! e0 n, {+ t7 t' \4 x5 D: o' g' B4 J- [1 R% R! W
" }4 I y( ]. ?" d) y, k1 Q( B
( e; A* ^ j9 A+ Y. p4 G' `! n! X% L! Z" M3 f) _) d; u, p8 v
& F2 x; ^ u1 l* `
3 h4 ~) G5 h2 s
' L& ~2 f* O& `* f+ u) I# A) q
; |% g; S7 Y" x) M% {% e% O: T5 X1 O# r8 ~
0 h4 M2 v5 c+ T
复制代码
4 t7 M! N6 n; O# v6 y
, P* ^4 {4 b, k- C5 V+ R* x( x* N9 ~
2 c; G3 v& @" j
) Y& N2 [2 p/ }3 ~ R: z
7 L! v" d& |* G; Z5 O# [4 d( U. d
3 k4 M i* E8 G- B" K6 D# O6 v9 E- J
利用 Zookeeper 的强一致性特性,一个节点只能被一个客户端创建成功,创建成功的 broker 即为 leader,即先到先得原则,leader 也就是集群中的 controller,负责集群中所有大小事务。
' e% D9 C' x4 g; x b' U当 leader 和 zookeeper 失去连接时,临时节点会删除,而其他 broker 会监听该节点的变化,当节点删除时,其他 broker 会收到事件通知,重新发起 leader 选举 + O0 r! b. z2 O4 r4 T% [, q- q
再给你们留个小问题:如果允许 Follower 副本对外提供读服务,你觉得应该如何避免或缓解因 Follower 副本与 Leader 副本不同步而导致的数据不一致的情形? 三、实时日志统计流程1、项目流程在整合这套方案的时候,项目组也是经过一番讨论,在讨论中,观点很多,有人认为直接使用 Storm 进行实时处理,去掉 Kafka 环节;也有认为直接使用 Kafka 的 API 去消费,去掉 Storm 的消费环节等等,但是最终组内还是一致决定使用这套方案,原因有如下几点:
7 k2 U9 Q4 V3 v7 }% m业务模块化 功能组件化 9 H) Y/ h* z3 E$ Z
- g1 O; I; h E7 V
我们认为,Kafka 在整个环节中充当的职责应该单一,这项目的整个环节她就是一个中间件,下面用一个图来说明这个原因,如下图所示: 2 P" J3 q( g) @* [
$ S7 m; @) |, o! F
1 b. ]- h/ A1 R. _. Y; H; y& H- X: E X) L, `
整个项目流程如上图所示,这样划分使得各个业务模块化,功能更加的清晰明了。 1 q$ `: \1 m; q5 X
Data Collection 9 f1 b4 r ]9 [$ [& |
8 L9 C. m6 m2 e+ x" V
负责从各个节点上实时收集用户上报的日志数据,我们选用的是 Apache 的 Flume NG 来实现。 / ? \1 _- v, Z1 ^
Data Access + f$ s, y! g" a
7 r$ k3 `, B' v0 S' [$ a. ^8 }: \) ^0 c
由于收集的数据的速度和数据处理的速度不一定是一致的,因此,这里添加了一个中间件来做处理,所使用的是 Apache 的 Kafka,关于 Kafka 集群部署。另外,有一部分数据是流向 HDFS 分布式文件系统了的,方便于为离线统计业务提供数据源。
. B/ a( i3 G# I- N: P1 U/ uStream Computing & l6 H9 X9 ]: q' R, m+ s
* B6 @3 g% J9 o" {0 F( a" [在收集到数据后,我们需要对这些数据做实时处理,所选用的是 Apache 的 Storm。关于 Storm 的集群搭建部署博客后面补上,较为简单。
) P2 U! ?! F- n c. EData Output $ m6 s8 z2 R9 T4 |
4 p5 H* d+ Z1 C$ i
在使用 Storm 对数据做处理后,我们需要将处理后的结果做持久化,由于对响应速度要求较高,这里采用 Redis+MySQL 来做持久化。整个项目的流程架构图,如下图所示:
0 x7 `; C! {( L6 q& h! G5 ?% i # {; S& D- n# p) k; ^
# ^4 ^& m/ m; r' z7 N
2、FlumeFlume 是一个分布式的、高可用的海量日志收集、聚合和传输日志收集系统,支持在日志系统中定制各类数据发送方(如:Kafka,HDFS 等),便于收集数据。Flume 提供了丰富的日志源收集类型,有:Console、RPC、Text、Tail、Syslog、Exec 等数据源的收集,在我们的日志系统中目前我们所使用的是 spooldir 方式进行日志文件采集,配置内容信息如下所示: 1 P; r, m, B& @4 p" a* t
& u, N( u" I% U. s" Q$ Y
2 B) a$ B! }6 y+ K& Wproducer.sources.s.type = spooldirproducer.sources.s.spoolDir = /home/hadoop/dir/logdfs
7 [% ^: N% z; r9 o# }) f3 a) j( F
& v; f' z$ Z! U6 Z9 j+ a# G: l% ?& ~+ W) e; Z& o8 W
; M5 f$ t+ L- l6 x
1 `; A* B5 W2 f! q: l5 H4 I8 h q) j( N& `7 j" p6 H/ c6 t8 `- V
7 |7 S; J- E/ y& r0 l5 V( H
+ a0 P; ^2 ^# I F' j3 T5 d8 X' \5 g; ~( e) ^
; L! h1 H7 `4 s
6 X) q: s3 j H7 i: ?
复制代码
; g) j' f; K8 K, S7 R7 w9 }$ S& C6 m3 q
* B! ^7 Z }' k# k; ^
& R+ J, c: Z+ |1 H
! O' \1 h: p) b6 B2 k. k3 x9 o, Y% a% Y( n! ^4 q \
5 C! L- [" |; V0 S f7 ~ R
: s. P! c* H$ }. p/ o1 C: T当然,Flume 的数据发送方类型也是多种类型的,有:Console、Text、HDFS、RPC 等,这里我们系统所使用的是 Kafka 中间件来接收,配置内容如下所示: " v0 ^: z4 r( b7 T
% E5 x9 C0 g5 R8 `
( _6 l; } J2 i2 U
producer.sinks.r.type = org.apache.flume.plugins.KafkaSinkproducer.sinks.r.metadata.broker.list=dn1:9092,dn2:9092,dn3:9092producer.sinks.r.partition.key=0producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartitionproducer.sinks.r.serializer.class=kafka.serializer.StringEncoderproducer.sinks.r.request.required.acks=0producer.sinks.r.max.message.size=1000000producer.sinks.r.producer.type=syncproducer.sinks.r.custom.encoding=UTF-8producer.sinks.r.custom.topic.name=test1 _! t( K8 h; i: _" t* ?( G; P+ i
$ g8 K1 }8 F& @# p
9 y9 ^, Y% g0 z1 `- {( E
: j- g0 r; F! X
# Y+ \4 C( v* u
9 |! {* F/ B& V% S
7 H, w+ e1 k2 k5 \- y5 I
3 l0 X3 e6 r3 D! j, `4 W' r! u. j9 g& [0 b
. M- ~! @1 _: l% S" b6 E& S9 U5 p
1 V- y$ m1 T# r+ N* m2 L T复制代码
; h" B' ]2 }# J& c
( U- c. O. \1 ]; u$ `' \/ h! ^9 q$ f- _
/ l$ B- ~8 Y$ E3 u8 Z) g% R
; {2 }5 ?: B- R4 ?, Q) M; u/ m3 H: B2 R2 Q# ?2 i7 T- s
L, @3 i S$ S, E
& c0 m) l9 ^/ S$ M9 K& l
4 \. x% i8 I6 W5 X
) O2 z, r1 z- w4 P
" I! L5 }$ [- e; U# f3 t7 M' Q4 E" g8 }5 K7 r& S
" m5 d8 Y% `3 Z6 Q4 d+ v4 E, Q \6 v: g8 C- _/ A0 t
: X9 A! @, E% ^. |3、KafkaKafka 是一种提供高吞吐量的分布式发布订阅消息系统,她的特性如下所示:
* `$ H7 i! S( [4 I( m0 D2 V通过磁盘数据结构提供消息的持久化,这种结构对于即使数据达到 TB+级别的消息,存储也能够保持长时间的稳定。 搞吞吐特性使得 Kafka 即使使用普通的机器硬件,也可以支持每秒数 10W 的消息。 能够通过 Kafka Cluster 和 Consumer Cluster 来 Partition 消息。
- r8 r* C0 F0 Z ; V' q' E P. K2 ?5 N6 a
Kafka 的目的是提供一个发布订阅解决方案,他可以处理 Consumer 网站中的所有流动数据,在网页浏览,搜索以及用户的一些行为,这些动作是较为关键的因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于 Hadoop 这样的日志数据和离线计算系统,这样的方案是一个解决实时处理较好的一种方案。
3 R9 U7 ]5 c+ m# y关于 Kafka 集群的搭建部署和使用,上面已经写了,不会的朋友翻上去再看一下,这里就不赘述了。 4、StormTwitter 将 Storm 开源了,这是一个分布式的、容错的实时计算系统,已被贡献到 Apache 基金会,下载地址如下所示:
j% u8 x+ p& C/ x
8 \! v' C1 U- ^& H0 n7 o. y' v+ P* J) h, o8 J- e
http://storm.apache.org/downloads.html
4 z8 m0 d( D9 n$ @# A, ]: P, @
3 S7 L9 e2 B3 l
" ~; P5 Z7 s8 d/ n9 ?5 L; ?7 K: y4 [
/ s. L. S0 a f9 d1 U- F4 M, I
- j5 N8 [0 V1 f. {( q) k4 k! h; B
, p% Z. P& d' M( [9 A6 Q6 L) ^
- l0 J0 S6 B8 p
2 C. @6 { ~& n! _) ?9 k/ j! @
8 w1 |, L, o- l复制代码6 y0 d2 C6 a& [8 X8 d
# T! @2 c* u) o1 z1 Y$ r6 f0 o# q
+ e6 z% ]8 I6 E, l
! {' u! ^, p* m5 D4 C0 G' K7 f5 {2 i+ I& q5 w. t* T5 {
7 w4 G) a+ D- T8 I8 W
7 m3 H3 U( i, h/ U5 CStorm 的主要特点如下:
# g7 D; e- g0 p* D$ O简单的编程模型。类似于 MapReduce 降低了并行批处理复杂性,Storm 降低了进行实时处理的复杂性。 可以使用各种编程语言。你可以在 Storm 之上使用各种编程语言。默认支持 Clojure、Java、Ruby 和 Python。要增加对其他语言的支持,只需实现一个简单的 Storm 通信协议即可。 容错性。Storm 会管理工作进程和节点的故障。 水平扩展。计算是在多个线程、进程和服务器之间并行进行的。 可靠的消息处理。Storm 保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息。 快速。系统的设计保证了消息能得到快速的处理,使用ØMQ 作为其底层消息队列。 本地模式。Storm 有一个本地模式,可以在处理过程中完全模拟 Storm 集群。这让你可以快速进行开发和单元测试。
7 E9 A' A4 O1 r3 d/ | 4 s5 t! L9 Y3 r" j
Storm 集群由一个主节点和多个工作节点组成。主节点运行了一个名为“Nimbus”的守护进程,用于分配代码、布置任务及故障检测。每个工作节 点都运行了一个名为“Supervisor”的守护进程,用于监听工作,开始并终止工作进程。 0 G2 E9 |1 L, q. _
Nimbus 和 Supervisor 都能快速失败,而且是无 状态的,这样一来它们就变得十分健壮,两者的协调工作是由 Apache 的 ZooKeeper 来完成的。
* s+ J6 c# D' p& t2 f$ jStorm 的术语包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。 " i) U+ b c2 ^" V3 y1 @0 r
Stream 是被处理的数据。 Spout 是数据源。 Bolt 处理数据。 Task 是运行于 Spout 或 Bolt 中的 线程。 Worker 是运行这些线程的进程。 Stream Grouping 规定了 Bolt 接收什么东西作为输入数据。数据可以随机分配(术语为 Shuffle),或者根据字段值分配(术语为 Fields),或者广播(术语为 All),或者总是发给一个 Task(术语为 Global),也可以不关心该数据(术语为 None),或者由自定义逻辑来决定(术语为 Direct)。 Topology 是由 Stream Grouping 连接起来的 Spout 和 Bolt 节点网络。在 Storm Concepts 页面里对这些术语有更详细的描述。 : Z7 p- |0 Y1 m' |/ d" E
- r( J/ Q5 r8 U F( j9 q B
关于 Storm 集群的搭建部署,博客在下一篇中更新,到时候会将更新地址附在这里,这里就先不对 Storm 集群的搭建部署做过多的赘述了。 5、总结Kafka 日志消息保存时间总结 Kafka 作为一个高吞吐的消息中间件和传统的消息中间件一个很大的不同点就在于它的日志实际上是以日志的方式默认保存在/kafka-logs 文件夹中的。虽然默认有 7 天清楚的机制,但是在数据量大,而磁盘容量不足的情况下,经常出现无法写入的情况。如何调整 Kafka 的一些默认参数就显得比较关键了。这里笔者整理了一些常见的配置参数供大家参考: ! S" ^- R* H0 H# W6 l
分段策略属性 $ Y& u0 D7 y) U
2 T \% C0 o M/ O7 F& P; O$ T; g: x, f3 @4 \& z4 P/ Q0 Y
+ h2 M% \' n, D3 d3 }日志刷新策略 : A- D3 _/ h# I2 {& w% H! x* r
Kafka 的日志实际上是开始是在缓存中的,然后根据策略定期一批一批写入到日志文件中去,以提高吞吐率。
8 n7 O9 A4 o4 W( P% ?
5 R) q9 G5 D0 s4 O+ h$ f. z6 t F D6 J' H! P
8 |8 V$ W0 t$ v" B9 x0 C6 h' P4 C
日志保存清理策略
) y9 r/ _( K" E% p
6 C$ u, ?; r, v E7 L) c$ ~6 |* C% M& P, ]# q" }5 s7 b$ \% `) s
# M+ d8 k: Y& |; i
这里特别说明一下,日志的真正清楚时间。当删除的条件满足以后,日志将被“删除”,但是这里的删除其实只是将该日志进行了“delete”标注,文件只是无法被索引到了而已。 % `- c7 o) c1 H, d# ~2 T
但是文件本身,仍然是存在的,只有当过了 log.segment.delete.delay.ms 这个时间以后,文件才会被真正的从文件系统中删除。
6 @& \1 w+ b: J( H
4 Z, Z) q; \' c- \& X1 u: Y8 ^0 i* S- {0 v7 Z u7 d% d
后面我会把另外两个中间件也分别写文章分析,可以给我点个关注第一时间接到通知
0 N! n, S1 L* m$ u/ ` |