|
Ganglia是UC Berkeley发起的一个开源集群监视项目,主要是用来监控系统性能,如:cpu 、mem、硬盘利用率, I/O负载、网络流量情况等,通过曲线很容易见到每个节点的工作状态,对合理调整、分配系统资源,提高系统整体性能起到重要作用。 由于Licene的限制,没有放到默认的build里面,如果需要使用,需要自己编译。在使用Maven编译Spark的时候,我们可以加上-Pspark-ganglia-lgpl选项来将Ganglia相关的类打包进spark-assembly-x.x.x-hadoopx.x.x.jar中,命令如下: [iteblog@iteblog spark]$ ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -DskipTests : A* R- U7 h; e
-Dhadoop.version=2.4.0 -Pspark-ganglia-lgpl
5 X) c0 x1 o6 ~: u9 F6 ~: o& c" {( g6 t; T8 K
如果你使用的是SBT来编译,可以加上SPARK_GANGLIA_LGPL=true,完整命令如下:
& v) U5 C2 x- o3 \[iteblog@iteblog spark]$ SPARK_HADOOP_VERSION=2.4.0 SPARK_YARN=true3 a- \% U2 G4 [% ~9 E# p. ^( v
SPARK_GANGLIA_LGPL=true sbt/sbt assembly 或者你在提交作业的时候,单独将Ganglia相关依赖加入到--jars选项中: [size=1em][size=1em]--jars lib/spark-ganglia-lgpl_2.10-x.x.x.jar ...
: T: }4 L% B! X6 Y. b[size=1em]
0 w- F; k# {. l% K, P% p, G% ? v1 F+ g4 g5 T. J
. i' Y6 |' C" x6 R4 S
|
依赖弄好之后,我们需要在$SPARK_HOME/conf/metrics.properties文件中加入一下配置:
, e( k% G1 I/ U8 p6 c1 M# S) O6 Q) X. Y x* \
依赖弄好之后,我们需要在$SPARK_HOME/conf/metrics.properties文件中加入一下配置: *.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink *.sink.ganglia.host=www.iteblog.com *.sink.ganglia.port=8080 *.sink.ganglia.period=10 *.sink.ganglia.unit=seconds *.sink.ganglia.ttl=1 *.sink.ganglia.mode=multicast
' b: V$ w: Y1 k9 [ R* T: a' D7 y% \2 q; C4 J O+ b
host和port这个就是你Ganglia监控的地址,其中mode支持'unicast'(单播) 和 'multicast'(多播)两种模式。 如果你出现类似下面的异常信息: 15/06/11 23:35:14 ERROR MetricsSystem: Sink class org.apache.spark.metrics.sink.GangliaSink cannot be instantialized java.lang.ClassNotFoundException: org.apache.spark.metrics.sink.GangliaSink at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) ( H& }( w# ^1 n& m% k
at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:138) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:134) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:134)
& y: l2 c6 c7 e. l2 l0 z
at org.apache.spark.metrics.MetricsSystem.<init>(MetricsSystem.scala:84) at org.apache.spark.metrics.MetricsSystem$.createMetricsSystem(MetricsSystem.scala:171) at org.apache.spark.deploy.worker.Worker.<init>(Worker.scala:106) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at akka.util.Reflect$.instantiate(Reflect.scala:65) at akka.actor.Props.newActor(Props.scala:337) : s1 J- k4 ?* q; `8 U: ?5 n
at akka.actor.ActorCell.newActor(ActorCell.scala:534) at akka.actor.ActorCell.create(ActorCell.scala:560) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.run(Mailbox.scala:218) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 请查看你的Spark包是否将org.apache.spark.metrics.sink.GangliaSink打包进去了;或者仔细看下你的配置文件,请尽量拷贝我这里提供的。 除了上图的master.apps 和master.workers 监控,Ganglia里面还显示如下的信息: { "version": "3.0.0", "gauges": { "jvm.PS-MarkSweep.count": { "value": 0 }, "jvm.PS-MarkSweep.time": { "value": 0 }, "jvm.PS-Scavenge.c ount": { "value": 186 }, "jvm.PS-Scavenge.time": { "value": 375 }, "jvm.heap.committed": { "value": 536412160 }, "jvm.heap.init": { "value": 536870912 }, "jvm.heap.max": { "value": 536412160 }, "jvm.heap.usage": { "value": 0.315636349481712 }, "jvm.heap.used": { "value": 169311176 }, "jvm.non-heap.committed": { "value": 37879808 }, "jvm.non-heap.init": { "value": 24313856 }, "jvm.non-heap.max": { "value": 184549376 }, "jvm.non-heap.usage": { "value": 0.19970542734319513 }, "jvm.non-heap.used": { "value": 36855512 . w V5 ^0 ?+ Y" ^9 j% R" F( n
}, "jvm.pools.Code-Cache.usage": { "value": 0.031689961751302086 }, "jvm.pools.PS-Eden-Space.usage": { "value": 0.9052384254331968 }, "jvm.pools.PS-Old-Gen.usage": { "value": 0.02212668565200476 }, "jvm.pools.PS-Perm-Gen.usage": { "value": 0.26271122694015503 }, "jvm.pools.PS-Survivor-Space.usage": { "value": 0.5714285714285714 }, "jvm.total.committed": { "value": 574291968 }, "jvm.total.init": { "value": 561184768 }, "jvm.total.max": { "value": 720961536 }, "jvm.total.used": { "value": 206166688 }, "master.apps": { "value": 0 }, "master.waitingApps": { "value": 0 }, "master.workers": { "value": 0 } }, "counters": { }, "histograms": { }, "meters": { }, "timers": { }
: O% g+ w5 u7 Z4 y, T2 `) ~7 I; z
} 5 }0 [( o! L3 g
, \! D, h1 A' O1 S! j- l
8 }2 @6 z1 c0 [3 C) O6 v; |3 |
|