Spark ALS实现的步骤是什么
发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,这篇文章主要讲解了"Spark ALS实现的步骤是什么",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Spark ALS实现的步骤是什么"吧!spark
千家信息网最后更新 2025年02月04日Spark ALS实现的步骤是什么
这篇文章主要讲解了"Spark ALS实现的步骤是什么",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Spark ALS实现的步骤是什么"吧!
spark ALS算法是做个性推荐用的,它所需要的数据集是类似用户对商品的打分表之类的数据集。实现步骤主要以下几步:
1、定义输入数据
2、输入数据转换成评分数据格式,如case class Rating(user: Int, movie: Int, rating: Float)
3、设计ALS模型训练数据
4、计算推荐数据,存储起来供业务系统直接使用。
下面看看具体的代码:
package recommendimport org.apache.spark.sql.SparkSessionimport java.util.Propertiesimport org.apache.spark.rdd.RDDimport org.apache.spark.ml.evaluation.RegressionEvaluatorimport org.apache.spark.ml.recommendation.ALSimport org.apache.spark.ml.feature.StringIndexerimport org.apache.spark.sql.Datasetimport org.apache.spark.sql.Rowimport org.apache.spark.ml.feature.IndexToStringimport scala.collection.mutable.ArrayBufferimport org.apache.spark.TaskContextimport org.apache.spark.ml.Pipelineimport org.apache.spark.sql.SaveMode/** * 个性化推荐ALS算法 * 用户对资源的点击率作为评分 * */object Recommend { case class Rating(user: Int, movie: Int, rating: Float) def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("Java Spark MYSQL Recommend") .master("local") .config("es.nodes", "127.0.0.1") .config("es.port", "9200") .config("es.mapping.date.rich", "false") //不解析日期类型 .getOrCreate() trainModel(spark) spark.close() } def trainModel(spark: SparkSession): Unit = { import spark.implicits._ val MAX = 3 // 最大推荐数目 val rank = 10 // 向量大小,默认10 val iterations = 10 // 迭代次数,默认10 val url = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8" val table = "clicks" val user = "root" val pass = "123456" val props = new Properties() props.setProperty("user", user) // 设置用户名 props.setProperty("password", pass) // 设置密码 val clicks = spark.read.jdbc(url, table, props).repartition(4) clicks.createOrReplaceGlobalTempView("clicks") val agg = spark.sql("SELECT userId ,resId ,COUNT(id) AS clicks FROM global_temp.clicks GROUP BY userId,resId") val userIndexer = new StringIndexer() .setInputCol("userId") .setOutputCol("userIndex") val resIndexer = new StringIndexer() .setInputCol("resId") .setOutputCol("resIndex") val indexed1 = userIndexer.fit(agg).transform(agg) val indexed2 = resIndexer.fit(indexed1).transform(indexed1) indexed2.show() val ratings = indexed2.map(x => Rating(x.getDouble(3).toInt, x.getDouble(4).toInt, x.getLong(2).toFloat)) ratings.show() val Array(training, test) = ratings.randomSplit(Array(0.9, 0.1)) println("training:") training.show() println("test:") test.show() //隐性反馈和显示反馈 val als = new ALS() .setMaxIter(iterations) .setRegParam(0.01) .setImplicitPrefs(false) .setUserCol("user") .setItemCol("movie") .setRatingCol("rating") val model = als.fit(ratings) // Evaluate the model by computing the RMSE on the test data // Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics model.setColdStartStrategy("drop") val predictions = model.transform(test) val r2 = model.recommendForAllUsers(MAX) println(r2.schema) val result = r2.rdd.flatMap(row => { val userId = row.getInt(0) val arrayPredict: Seq[Row] = row.getSeq(1) var result = ArrayBuffer[Rating]() arrayPredict.foreach(rowPredict => { val p = rowPredict(0).asInstanceOf[Int] val score = rowPredict(1).asInstanceOf[Float] val sql = "insert into recommends(userId,resId,score) values (" + userId + "," + rowPredict(0) + "," + rowPredict(1) + ")" println("sql:" + sql) result.append(Rating(userId, p, score)) }) for (i <- result) yield { i } }) println("推荐结果RDD已展开") result.toDF().show() //资源id隐射 val resInt2Index = new IndexToString() .setInputCol("movie") .setOutputCol("resId") .setLabels(resIndexer.fit(indexed1).labels) //userId映射 val userInt2Index = new IndexToString() .setInputCol("user") .setOutputCol("userId") .setLabels(userIndexer.fit(agg).labels) val rc = userInt2Index.transform(resInt2Index.transform(result.toDF())) rc.show() rc.withColumnRenamed("rating","score").select("userId", "resId","score").write.mode(SaveMode.Overwrite) .format("jdbc") .option("url", url) .option("dbtable", "recommends") .option("user", user) .option("password", pass) .option("batchsize", "5000") .option("truncate", "true") .save println("finished!!!") }}
DataFrame写入mysql还有另一种写法,就是原生写入:
//分区写推荐结果到mysql r2.foreachPartition(p => { @transient val conn = ConnectionPool.getConnection p.foreach(row => { val userId = row.getInt(0) val arrayPredict: Seq[Row] = row.getSeq(1) arrayPredict.foreach(rowPredict => { println(rowPredict(0) + "@" + rowPredict(1)) val sql = "insert into recommends(userId,resId,score) values (" + userId+"," + rowPredict(0)+","+ rowPredict(1) + ")" println("sql:"+sql) val stmt = conn.createStatement stmt.executeUpdate(sql) }) }) ConnectionPool.returnConnection(conn) })
感谢各位的阅读,以上就是"Spark ALS实现的步骤是什么"的内容了,经过本文的学习后,相信大家对Spark ALS实现的步骤是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
推荐
步骤
用户
学习
个性
内容
就是
算法
结果
资源
评分
输入
最大
业务
代码
写法
向量
商品
大小
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全工程师简笔画头像
电脑登录金铲铲显示服务器爆满
去哪儿软件开发员工级别
超凡先锋服务器为什么那么卡
如何创建数据库sa
手机pubg服务器延迟
数据库简单的增删改查
数据库名字和数据库文件名字
网络拓扑结构的网络安全威胁
中科院网络安全教程第11讲
sql查找数据库
insert图片进入数据库
数据库设计参与的人员
国内主流网络安全产品
数据库 现有关系供应商
主体与客体网络安全保卫
数据库鸡兔同笼
空间数据库和地理科学的关系
网络安全 读博研究规划
鹤壁网络安全系统价格
浦东新区融合led大屏服务器
网络技术术语词汇大全
校园网络安全怎么设计
地税局网络安全检查
临沂软件开发工资每月多少钱
安卓软件开发想法
客户端连接数据服务器
网络安全工作责任制落实指引
云服务器一个月收费多少
网络技术认证