千家信息网

Spark On Yarn实战

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,这里已经部署好hadoop环境,以及spark环境如下:192.168.1.2 master[hadoop@master ~]$ jps2298 SecondaryNameNode2131 NameN
千家信息网最后更新 2025年01月24日Spark On Yarn实战

这里已经部署好hadoop环境,以及spark

环境如下:

192.168.1.2 master

[hadoop@master ~]$ jps2298 SecondaryNameNode2131 NameNode2593 JobHistoryServer4363 Jps3550 HistoryServer2481 ResourceManager3362 Master


192.168.1.3 slave1

[hadoop@slave1 ~]$ jps2919 Jps2464 Worker1993 DataNode2109 NodeManager


192.168.1.4 slave2

[hadoop@slave2 ~]$ jps2762 Jps2113 NodeManager1998 DataNode2452 Worker


这里以spark自带求pi值的python程序为例

[hadoop@master ~]$ cd spark[hadoop@master spark]$ find . -name "pi.py"[hadoop@master spark]$ cat ./examples/src/main/python/pi.py## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements.  See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License.  You may obtain a copy of the License at##    http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#import sysfrom random import randomfrom operator import addfrom pyspark import SparkContextif __name__ == "__main__":    """        Usage: pi [slices]    """    sc = SparkContext(appName="PythonPi")    slices = int(sys.argv[1]) if len(sys.argv) > 1 else 2    n = 100000 * slices    def f(_):        x = random() * 2 - 1        y = random() * 2 - 1        return 1 if x ** 2 + y ** 2 < 1 else 0    count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add)    print "Pi is roughly %f" % (4.0 * count / n)    [hadoop@master spark]$ cd ./examples/src/main/python/ # 修改pi.py文件,在末尾添加sc.stop()[hadoop@master python]$ spark-submit --master spark://master:7077 --executor-memory 200m --driver-memory 200m pi.py # 如报下面错误,绑定hosts文件127.0.0.1为localhostTraceback (most recent call last):  File "/home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py", line 29, in     sc = SparkContext(appName="PythonPi")  File "/home/hadoop/spark/python/pyspark/context.py", line 138, in __init__    self._accumulatorServer = accumulators._start_update_server()  File "/home/hadoop/spark/python/pyspark/accumulators.py", line 224, in _start_update_server    server = SocketServer.TCPServer(("localhost", 0), _UpdateRequestHandler)  File "/usr/lib64/python2.6/SocketServer.py", line 402, in __init__    self.server_bind()  File "/usr/lib64/python2.6/SocketServer.py", line 413, in server_bind    self.socket.bind(self.server_address)  File "", line 1, in bindsocket.gaierror: [Errno -3] Temporary failure in name resolution# 正常执行如下[hadoop@master python]$ spark-submit --master spark://master:7077 --executor-memory 200m --driver-memory 200m pi.pySpark assembly has been built with Hive, including Datanucleus jars on classpath15/03/25 12:18:27 INFO spark.SecurityManager: Changing view acls to: hadoop15/03/25 12:18:27 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop)15/03/25 12:18:28 INFO slf4j.Slf4jLogger: Slf4jLogger started15/03/25 12:18:28 INFO Remoting: Starting remoting15/03/25 12:18:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@master:47877]15/03/25 12:18:29 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@master:47877]15/03/25 12:18:29 INFO spark.SparkEnv: Registering MapOutputTracker15/03/25 12:18:29 INFO spark.SparkEnv: Registering BlockManagerMaster15/03/25 12:18:29 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20150325121829-88cd15/03/25 12:18:29 INFO storage.MemoryStore: MemoryStore started with capacity 116.0 MB.15/03/25 12:18:30 INFO network.ConnectionManager: Bound socket to port 48556 with id = ConnectionManagerId(master,48556)15/03/25 12:18:30 INFO storage.BlockManagerMaster: Trying to register BlockManager15/03/25 12:18:30 INFO storage.BlockManagerInfo: Registering block manager master:48556 with 116.0 MB RAM15/03/25 12:18:30 INFO storage.BlockManagerMaster: Registered BlockManager15/03/25 12:18:30 INFO spark.HttpServer: Starting HTTP Server15/03/25 12:18:30 INFO server.Server: jetty-8.y.z-SNAPSHOT15/03/25 12:18:30 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:4887215/03/25 12:18:30 INFO broadcast.HttpBroadcast: Broadcast server started at http://192.168.1.2:4887215/03/25 12:18:30 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-e2d76bbd-d2f6-4b2f-a018-f2d795a488aa15/03/25 12:18:30 INFO spark.HttpServer: Starting HTTP Server15/03/25 12:18:30 INFO server.Server: jetty-8.y.z-SNAPSHOT15/03/25 12:18:30 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:4314815/03/25 12:18:31 INFO server.Server: jetty-8.y.z-SNAPSHOT15/03/25 12:18:31 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:404015/03/25 12:18:31 INFO ui.SparkUI: Started SparkUI at http://master:404015/03/25 12:18:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable15/03/25 12:18:35 INFO scheduler.EventLoggingListener: Logging events to hdfs://master:9000/spark/log/pythonpi-142731111335215/03/25 12:18:35 INFO util.Utils: Copying /home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py to /tmp/spark-b66e65a9-91dc-479c-8938-14314fd1febb/pi.py15/03/25 12:18:36 INFO spark.SparkContext: Added file file:/home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py at http://192.168.1.2:43148/files/pi.py with timestamp 142731111593515/03/25 12:18:36 INFO client.AppClient$ClientActor: Connecting to master spark://master:7077...15/03/25 12:18:38 INFO spark.SparkContext: Starting job: reduce at /home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py:3815/03/25 12:18:38 INFO scheduler.DAGScheduler: Got job 0 (reduce at /home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py:38) with 2 output partitions (allowLocal=false)15/03/25 12:18:38 INFO scheduler.DAGScheduler: Final stage: Stage 0(reduce at /home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py:38)15/03/25 12:18:38 INFO scheduler.DAGScheduler: Parents of final stage: List()15/03/25 12:18:38 INFO scheduler.DAGScheduler: Missing parents: List()15/03/25 12:18:38 INFO scheduler.DAGScheduler: Submitting Stage 0 (PythonRDD[1] at RDD at PythonRDD.scala:37), which has no missing parents15/03/25 12:18:38 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (PythonRDD[1] at RDD at PythonRDD.scala:37)15/03/25 12:18:38 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks15/03/25 12:18:38 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150325121838-000115/03/25 12:18:38 INFO client.AppClient$ClientActor: Executor added: app-20150325121838-0001/0 on worker-20150325114825-slave1-50832 (slave1:50832) with 1 cores15/03/25 12:18:38 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150325121838-0001/0 on hostPort slave1:50832 with 1 cores, 200.0 MB RAM15/03/25 12:18:38 INFO client.AppClient$ClientActor: Executor added: app-20150325121838-0001/1 on worker-20150325114823-slave2-56888 (slave2:56888) with 1 cores15/03/25 12:18:38 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150325121838-0001/1 on hostPort slave2:56888 with 1 cores, 200.0 MB RAM15/03/25 12:18:39 INFO client.AppClient$ClientActor: Executor updated: app-20150325121838-0001/0 is now RUNNING15/03/25 12:18:39 INFO client.AppClient$ClientActor: Executor updated: app-20150325121838-0001/1 is now RUNNING15/03/25 12:18:43 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@slave1:35398/user/Executor#765391125] with ID 015/03/25 12:18:43 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: slave1 (PROCESS_LOCAL)15/03/25 12:18:43 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 374986 bytes in 12 ms15/03/25 12:18:44 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@slave2:37669/user/Executor#2076348799] with ID 115/03/25 12:18:44 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor 1: slave2 (PROCESS_LOCAL)15/03/25 12:18:44 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as 502789 bytes in 4 ms15/03/25 12:18:44 INFO storage.BlockManagerInfo: Registering block manager slave1:47192 with 116.0 MB RAM15/03/25 12:18:44 INFO storage.BlockManagerInfo: Registering block manager slave2:42313 with 116.0 MB RAM15/03/25 12:18:46 INFO scheduler.TaskSetManager: Finished TID 0 in 2534 ms on slave1 (progress: 1/2)15/03/25 12:18:46 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)15/03/25 12:18:46 INFO scheduler.TaskSetManager: Finished TID 1 in 2234 ms on slave2 (progress: 2/2)15/03/25 12:18:46 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/03/25 12:18:46 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1)15/03/25 12:18:46 INFO scheduler.DAGScheduler: Stage 0 (reduce at /home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py:38) finished in 7.867 s15/03/25 12:18:46 INFO spark.SparkContext: Job finished: reduce at /home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py:38, took 8.181053565 sPi is roughly 3.14722015/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null}15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null}15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null}15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null}15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/json,null}15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors,null}15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment/json,null}15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment,null}15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd/json,null}15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd,null}15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/json,null}15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage,null}15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool/json,null}15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool,null}15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/json,null}15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage,null}15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/json,null}15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages,null}15/03/25 12:18:46 INFO ui.SparkUI: Stopped Spark web UI at http://master:404015/03/25 12:18:46 INFO scheduler.DAGScheduler: Stopping DAGScheduler15/03/25 12:18:46 INFO cluster.SparkDeploySchedulerBackend: Shutting down all executors15/03/25 12:18:46 INFO cluster.SparkDeploySchedulerBackend: Asking each executor to shut down15/03/25 12:18:47 INFO spark.MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!15/03/25 12:18:47 INFO network.ConnectionManager: Selector thread was interrupted!15/03/25 12:18:47 INFO network.ConnectionManager: ConnectionManager stopped15/03/25 12:18:47 INFO storage.MemoryStore: MemoryStore cleared15/03/25 12:18:47 INFO storage.BlockManager: BlockManager stopped15/03/25 12:18:47 INFO storage.BlockManagerMasterActor: Stopping BlockManagerMaster15/03/25 12:18:47 INFO storage.BlockManagerMaster: BlockManagerMaster stopped15/03/25 12:18:47 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.15/03/25 12:18:47 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.15/03/25 12:18:47 INFO Remoting: Remoting shut down15/03/25 12:18:47 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.15/03/25 12:18:48 INFO spark.SparkContext: Successfully stopped SparkContext

查看任务监控,http://192.168.1.2:8080/

查看worker信息 http://192.168.1.3:8081/

spark on yarn实践

[hadoop@master ~]$ cd spark/examples/src/main/scala/org/apache/spark/examples/[hadoop@master examples]$ spark-submit --master yarn-cluster \> --class org.apache.spark.examples.SparkPi \> --driver-memory 400m \> --executor-memory 400m \> --executor-cores 1 \> --num-executors 2 \> /home/hadoop/spark/lib/spark-examples-1.0.2-hadoop2.2.0.jar 2# 如报下面错误,修改yarn-site.xml文件                        yarn.scheduler.maximum-allocation-mb                800        # value大于800即可,然后重启yarn# 正常结果如下:[hadoop@master sbin]$ spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi --driver-memory 400m --executor-memory 400m --executor-cores 1 --num-executors 2 /home/hadoop/spark/lib/spark-examples-1.0.2-hadoop2.2.0.jar 2Spark assembly has been built with Hive, including Datanucleus jars on classpath15/03/25 13:06:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable15/03/25 13:06:09 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.1.2:803215/03/25 13:06:09 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 215/03/25 13:06:09 INFO yarn.Client: Queue info ... queueName: default, queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0,      queueApplicationCount = 0, queueChildQueueCount = 015/03/25 13:06:09 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 80015/03/25 13:06:09 INFO yarn.Client: Preparing Local resources15/03/25 13:06:10 INFO yarn.Client: Uploading file:/home/hadoop/spark/lib/spark-examples-1.0.2-hadoop2.2.0.jar to hdfs://master:9000/user/hadoop/.sparkStaging/application_1427313904247_0001/spark-examples-1.0.2-hadoop2.2.0.jar15/03/25 13:06:13 INFO yarn.Client: Uploading file:/home/hadoop/spark-1.0.2-bin-hadoop2/lib/spark-assembly-1.0.2-hadoop2.2.0.jar to hdfs://master:9000/user/hadoop/.sparkStaging/application_1427313904247_0001/spark-assembly-1.0.2-hadoop2.2.0.jar15/03/25 13:06:25 INFO yarn.Client: Setting up the launch environment15/03/25 13:06:25 INFO yarn.Client: Setting up container launch context15/03/25 13:06:25 INFO yarn.Client: Command for starting the Spark ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx400m, -Djava.io.tmpdir=$PWD/tmp, -Dspark.app.name=\"org.apache.spark.examples.SparkPi\", -Dspark.eventLog.enabled=\"true\", -Dspark.eventLog.dir=\"hdfs://master:9000/spark/log\", -Dspark.yarn.historyServer.address=\"master:18080\",  -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.deploy.yarn.ApplicationMaster, --class, org.apache.spark.examples.SparkPi, --jar , file:/home/hadoop/spark/lib/spark-examples-1.0.2-hadoop2.2.0.jar,  --args  '2' , --executor-memory, 400, --executor-cores, 1, --num-executors , 2, 1>, /stdout, 2>, /stderr)15/03/25 13:06:25 INFO yarn.Client: Submitting application to ASM15/03/25 13:06:25 INFO impl.YarnClientImpl: Submitted application application_1427313904247_0001 to ResourceManager at master/192.168.1.2:803215/03/25 13:06:26 INFO yarn.Client: Application report from ASM:          application identifier: application_1427313904247_0001         appId: 1         clientToAMToken: null         appDiagnostics:          appMasterHost: N/A         appQueue: default         appMasterRpcPort: 0         appStartTime: 1427313985731         yarnAppState: ACCEPTED         distributedFinalState: UNDEFINED         appTrackingUrl: master:8088/proxy/application_1427313904247_0001/         appUser: hadoop15/03/25 13:06:27 INFO yarn.Client: Application report from ASM:          application identifier: application_1427313904247_0001         appId: 1         clientToAMToken: null         appDiagnostics:          appMasterHost: N/A         appQueue: default         appMasterRpcPort: 0         appStartTime: 1427313985731         yarnAppState: ACCEPTED         distributedFinalState: UNDEFINED         appTrackingUrl: master:8088/proxy/application_1427313904247_0001/         appUser: hadoop      


查看yarn监控页面:http://192.168.1.2:8088/cluster

可以看到任务是在slave2上面执行的

访问http://192.168.1.4:8042/node

登录slave2查看

[hadoop@slave2 ~]$ cd /home/hadoop/hadoop/logs/userlogs/application_1427313904247_0001/container_1427313904247_0001_01_000001[hadoop@slave2 container_1427313904247_0001_01_000001]$ lsstderr  stdout[hadoop@slave2 container_1427313904247_0001_01_000001]$ cat stdout Pi is roughly 3.13774[hadoop@slave2 ~]$ cd /home/hadoop/spark/examples/src/main/scala/org/apache/spark/examples/[hadoop@slave2 examples]$ cat SparkPi.scala /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements.  See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.spark.examplesimport scala.math.randomimport org.apache.spark._/** Computes an approximation to pi */object SparkPi {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("Spark Pi")    val spark = new SparkContext(conf)    val slices = if (args.length > 0) args(0).toInt else 2    val n = 100000 * slices    val count = spark.parallelize(1 to n, slices).map { i =>      val x = random * 2 - 1      val y = random * 2 - 1      if (x*x + y*y < 1) 1 else 0    }.reduce(_ + _)    println("Pi is roughly " + 4.0 * count / n)    spark.stop()  }}


在yarn上面使用spark-shell

[hadoop@master ~]$ spark-shell --master yarn-client


0