千家信息网

怎么实践Spark

发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,怎么实践Spark,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。Spark小试牛刀随着项目的运营,收集了很多的用户数据。最近业务上想
千家信息网最后更新 2025年01月31日怎么实践Spark

怎么实践Spark,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

Spark小试牛刀

随着项目的运营,收集了很多的用户数据。最近业务上想做些社交图谱相关的产品,但因为数据很多、很杂,传统的数据库查询已经满足不了业务的需求。 试着用Spark来做,权当练练手了。

安装Spark

因为有Scala的开发经验,所以就不用官方提供的二进制包了,自编译scala 2.11版本。

下载Spark:http://ftp.cuhk.edu.hk/pub/packages/apache.org/spark/spark-1.5.0/spark-1.5.0.tgz

tar zxf spark-1.5.0.tgzcd spark-1.5.0./dev/change-scala-version.sh 2.11mvn -Pyarn -Phadoop-2.6 -Dscala-2.11 -DskipTests clean package

以上命令完成Spark基于scala 2.11版本的编译。可以运行自带的一个示例程序来验证安装是否成功。

./bin/run-example SparkPi

编写Standalone application

使用sbt来构建一个可提交的简单Spark程序,功能是计算每个用户加入的群组,并把结果保存下来。project/Build.scala配置文件如下:

import _root_.sbt.Keys._import _root_.sbt._import sbtassembly.AssemblyKeys._object Build extends Build {  override lazy val settings = super.settings :+ {    shellPrompt := (s => Project.extract(s).currentProject.id + " > ")  }  lazy val root = Project("spark-mongodb", file("."))    .settings(      scalaVersion := "2.11.7",      assemblyJarName in assembly := "spark-mongodb.jar",      assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false),      libraryDependencies ++= Seq(        "org.apache.spark" %% "spark-core" % verSpark % "scopeProvidedTest,        "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.0" excludeAll(            ExclusionRule(organization = "javax.servlet"),             ExclusionRule(organization = "commons-beanutils"),             ExclusionRule(organization = "org.apache.hadoop")))    )    privateval scopeProvidedTest = "provided,test"  privateval verSpark = "1.5.0"}

数据存储在MongoDB数据库中,所以我们还需要使用mongo-hadoop连接器来访问MongoDB数据库。

示例程序

示例程序非常的简单,把数据从数据库里全部读出,使用map来把每条记录里用户ID对应加入的群组ID转换成一个Set,再使用 reduceByKey来把相同用户ID的set合并到一起,存入数据库即可。

import com.mongodb.BasicDBObjectimport com.mongodb.hadoop.{MongoInputFormat, MongoOutputFormat}import org.apache.hadoop.conf.Configurationimport org.apache.spark.{SparkConf, SparkContext}import org.bson.BSONObjectimport scala.collection.JavaConverters._object QQGroup {  def main(args: Array[String]): Unit = {    val sparkConf = new SparkConf().setAppName("QQGroup")    val sc = new SparkContext(sparkConf)    val inputConfig = new Configuration()    inputConfig.set("mongo.input.uri", "mongodb://192.168.31.121:27017/db.userGroup")    inputConfig.set("mongo.input.fields", """{"userId":1, "groupId":1, "_id":0}""")    inputConfig.set("mongo.input.noTimeout", "true")    val documentRDD = sc.newAPIHadoopRDD(      inputConfig,      classOf[MongoInputFormat],      classOf[Object],      classOf[BSONObject])    val userRDD = documentRDD.map { case (_, doc) =>      (getValue(doc, "userId"), getValue(doc, "groupId"))    }.reduceByKey(_ ++ _)    val resultRDD = userRDD.map { case (userId, groupIds) =>      val o = new BasicDBObject()      o.put("groupIds", groupIds.asJava)      userId -> o    }    val outputConfig = new Configuration()    outputConfig.set("mongo.output.uri", "mongodb://192.168.31.121:27017/db_result.userGroup")    resultRDD.saveAsNewAPIHadoopFile(      "file://this-is-completely-unused",      classOf[Object],      classOf[BSONObject],      classOf[MongoOutputFormat[Object, BSONObject]],      outputConfig)  }  def getValue(dbo: BSONObject, key: String) = {    val value = dbo.get(key)    if (value eq null) "" else value.asInstanceOf[String]  }}

MongoDB官方提供了Hadoop连接器,Spark可以使用mongo-hadoop连接器来读、写MongoDB数据库。 主要的输入配置荐有:

  • mongo.input.uri: MongoDB的连接URI

  • mongo.input.fields: 指定返回哪些数据,与db.query里的第2个参数功能一样

  • mongo.input.query: MongoDB的查询参数

相应的MongoDB也提供了一系列的输出参数,如:

  • mongo.output.uri: MongoDB的连接URI

sc.newAPIHadoopRDD()方法有4个参数,分别为:配置、输入格式化类、待映射数据主键类型、待映射数据类型。

主要的操作代码:

    val userRDD = documentRDD.map { case (_, doc) =>      (getValue(doc, "userId"), Set(getValue(doc, "groupId")))    }.reduceByKey(_ ++ _)    val resultRDD = userRDD.map { case (userId, groupIds) =>      val o = new BasicDBObject()      o.put("groupIds", groupIds.asJava)      userId -> o    }

先使用map方法获取userIdgroupId,并把groupId转换为一个Set

在把数据转换成Tuple2,就是一个KV的形式以后,我们就可以调用一系列的转换方法来对RDD进行操作,这里使用reduceByKey方法来将同一个userId的所以value都合并在一起。这样我们就有了所有用户对应加入的群组 的一个RDD集了。

(RDD上有两种类型的操作。一种是"变换",它只是描述了待进行的操作指令,并不会触发实际的计算;另一种是"动作", 它将触发实际的计算动作,这时候系统才会实际的从数据源读入数据,操作内存,保存数据等)

最后使用resultRDD.saveAsNewAPIHadoopFile()方法来把计算结果存入MongoDB,这里的一个参数:用于指定 HDFS的存储位置并不会使用到,因为mongo-hadoop将会使用mongo.output.uri指定的存储URI连接地址来保存数据。

关于怎么实践Spark问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。

0