千家信息网

Spark submit依赖包管理!

发表于:2024-10-01 作者:千家信息网编辑
千家信息网最后更新 2024年10月01日,Spark submit依赖包管理!使用spark-submit时,应用程序的jar包以及通过-jars选项包含的任意jar文件都会被自动传到集群中。spark-submit --class --ma
千家信息网最后更新 2024年10月01日Spark submit依赖包管理!

Spark submit依赖包管理!


使用spark-submit时,应用程序的jar包以及通过-jars选项包含的任意jar文件都会被自动传到集群中。

spark-submit --class --master --jars

Spark使用了下面的URL格式允许不同的jar包分发策略。

1、文件file方式:

绝对路径且file:/URIs是作为driver的HTTP文件服务器,且每个executor会从driver的HTTP服务器拉取文件;

2、hdfs方式:

http:,https:,ftp:,从这些给定的URI中拉取文件和JAR包;

3、本地local方式:

以local:/开始的URI应该是每个worker节点的本地文件,这意味着没有网络IO开销,并且推送或通过NFS/GlusterFS等共享到每个worker大文件/JAR文件或能很好的工作。


注意每个SparkContext的JAR包和文件都会被复制到executor节点的工作目录下,这将用掉大量的空间,然后还需要清理干净。

在YARN下,清理是自动进行的。在Spark Standalone下,自动清理可以通过配置spark.worker.cleanup.appDataTtl属性做到,此配置属性的默认值是7*24*3600。

用户可以用--packages选项提供一个以逗号分隔的maven清单来包含任意其他依赖。

其它的库(或SBT中的resolvers)可以用--repositories选项添加(同样用逗号分隔),这些命令都可以用在pyspark,spark-shell和spark-submit中来包含一些Spark包。

对Python而言,--py-files选项可以用来向executors分发.egg,.zip和.py库。


源码走读:


1、

object SparkSubmit


2、

appArgs.{  SparkSubmitAction.=> (appArgs)  SparkSubmitAction.=> (appArgs)  SparkSubmitAction.=> (appArgs)}

3、

(args: SparkSubmitArguments): = {  (childArgschildClasspathsysPropschildMainClass) = (args)  (): = {    (args.!= ) {      proxyUser = UserGroupInformation.createProxyUser(args.UserGroupInformation.getCurrentUser())      {        proxyUser.doAs(PrivilegedExceptionAction[]() {          (): = {            (childArgschildClasspathsysPropschildMainClassargs.)          }        })

4、

(jar <- childClasspath) {  (jarloader)}

5、

(localJar: loader: MutableURLClassLoader) {  uri = Utils.(localJar)  uri.getScheme {    | =>      file = File(uri.getPath)      (file.exists()) {        loader.addURL(file.toURI.toURL)      } {        (file)      }    _ =>      (uri)  }}

之后线索就断了,回归到java的class类调用jar包。

6、谁调用,executor。

(newFiles: HashMap[]newJars: HashMap[]) {  hadoopConf = SparkHadoopUtil..newConfiguration()  synchronized {    ((nametimestamp) <- newFiles .getOrElse(name-) < timestamp) {      logInfo(+ name + + timestamp)      Utils.(nameFile(SparkFiles.())env.securityManagerhadoopConftimestampuseCache = !isLocal)      (name) = timestamp    }    ((nametimestamp) <- newJars) {      localName = name.split().last      currentTimeStamp = .get(name)        .orElse(.get(localName))        .getOrElse(-)      (currentTimeStamp < timestamp) {        logInfo(+ name + + timestamp)        Utils.(nameFile(SparkFiles.())env.securityManagerhadoopConftimestampuseCache = !isLocal)        (name) = timestamp        url = File(SparkFiles.()localName).toURI.toURL        (!.getURLs().contains(url)) {          logInfo(+ url + )          .addURL(url)        }      }    }  }}

Utils.fetchFile方法,进入

 /**
* Download a file or directory to target directory. Supports fetching the file in a variety of
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
* filesystems.
*
* If `useCache` is true, first attempts to fetch the file to a local cache that's shared
* across executors running the same application. `useCache` is used mainly for
* the executors, and not in local mode.
*
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
*/
(!cachedFile.exists()) {  (urllocalDircachedFileNameconfsecurityMgrhadoopConf)}

可见,支持本地files,Hadoop的hdfs,还有http格式的文件。


其中目录目前支持hdfs!


完毕!

0