千家信息网

Spark机器学习

发表于:2024-12-01 作者:千家信息网编辑
千家信息网最后更新 2024年12月01日,Spark机器学习Pipelines中的主要概念MLlib 提供的API可以通过Pipelines将多个复杂的机器学习算法结合成单个pipeline或者单个工作流。这个概念和scikit-learn里
千家信息网最后更新 2024年12月01日Spark机器学习

Spark机器学习

Pipelines中的主要概念

MLlib 提供的API可以通过Pipelines将多个复杂的机器学习算法结合成单个pipeline或者单个工作流。这个概念和scikit-learn里的概念类似,根据官方的说法是,此抽象概念的设计灵感来自于scikit-learn

· DataFrame:通过Spark SQL 组件里的DataFrame作为机器学习的数据集。支持多种数据类型.比如 DataFrame 可以将文本,数据库等外部数据源划分为不同的列,包含特征向量, 特征值等。

· Transformer: 一个 Transformer可以将一个DataFrame 转换成另一个DataFrame. 比如, 一个机器学习模型可以将带有特征值的DataFrame转换为一个带有模型预测结果数据的DataFrame.

· Estimator:通过 DataFrame数据集进行训练 产生一个机器学习模型的算法。

· Pipeline:联合多个 Transformer Estimator构成一个机器学习工作流

· Parameter: 所有Transformer Estimator指定参数的共享API

DataFrame

DataFrame里广泛运用的数据结构,可以包含向量,文本,图片,以及结构化数据。DataFrame通过Spark SQL支持多种数据源。

工作流程如图所示:


机器学习中Pipleline流程图

正如图中所示,Pipeline有三个阶段,每个阶段要么是Transformer ,要么就是Estimator,这些阶段按照一定的顺序执行,执行的过程中,通过圆柱体代表的DataFrame类型的Raw text产生一个新的Words(DataFrame类型),最后建立了一个LogisticRegressionModel。图中的Tokenizer,HashingTF都是Transformer,LogisticRegressionModelEstimator

Transformer 阶段,主要调用transform()方法进行计算。

Estimator阶段,主要调用fit()方法进行计算。

DAG Pipelines:多个阶段形成一个pipeline,同理,DAG Pipelines就是多个pipeline组成的一个有向无环图。

运行时检查:数据结构DataFrame中可以有各种各样的数据,但是在编译的时候不会检查数据的数据类型,而是在运行的时候才根据DataFrameSchema来检查数据类型。

唯一ID标识:Pipeline的每一个阶段(stage)都通过id来进行唯一的标识,同一个相同的实列,比如HashingTF不会插入到同一个Pipeline俩次,因为每一个stage都有自身的唯一的ID来进行标识

保存和读取pipeline

代码案例:

Estimator, Transformer, 以及 Param综合案例

importorg.apache.spark.ml.classification.LogisticRegression

importorg.apache.spark.ml.linalg.{Vector,Vectors}

importorg.apache.spark.ml.param.ParamMap

importorg.apache.spark.sql.Row

// Prepare training data from a list of (label, features)tuples.

valtraining=spark.createDataFrame(Seq(

(1.0,Vectors.dense(0.0,1.1,0.1)),

(0.0,Vectors.dense(2.0,1.0,-1.0)),

(0.0,Vectors.dense(2.0,1.3,1.0)),

(1.0,Vectors.dense(0.0,1.2,-0.5))

)).toDF("label","features")

// Create a LogisticRegression instance. This instance is anEstimator.

vallr=newLogisticRegression()

// Print out the parameters, documentation, and any defaultvalues.

println("LogisticRegressionparameters:\n"+lr.explainParams()+"\n")

// We may set parameters using setter methods.

lr.setMaxIter(10)

.setRegParam(0.01)

// Learn a LogisticRegression model. This uses the parametersstored in lr.

valmodel1=lr.fit(training)

// Since model1 is a Model (i.e., a Transformer produced byan Estimator),

// we can view the parameters it used during fit().

// This prints the parameter (name: value) pairs, where namesare unique IDs for this

// LogisticRegression instance.

println("Model 1 was fit usingparameters: "+model1.parent.extractParamMap)

// We may alternatively specify parameters using a ParamMap,

// which supports several methods for specifying parameters.

valparamMap=ParamMap(lr.maxIter->20)

.put(lr.maxIter,30) // Specify 1 Param. This overwrites the original maxIter.

.put(lr.regParam->0.1,lr.threshold->0.55) // Specify multiple Params.

// One can also combine ParamMaps.

valparamMap2=ParamMap(lr.probabilityCol->"myProbability") // Change output column name.

valparamMapCombined=paramMap++paramMap2

// Now learn a new model using the paramMapCombinedparameters.

// paramMapCombined overrides all parameters set earlier vialr.set* methods.

valmodel2=lr.fit(training,paramMapCombined)

println("Model 2 was fit usingparameters: "+model2.parent.extractParamMap)

// Prepare test data.

valtest=spark.createDataFrame(Seq(

(1.0,Vectors.dense(-1.0,1.5,1.3)),

(0.0,Vectors.dense(3.0,2.0,-0.1)),

(1.0,Vectors.dense(0.0,2.2,-1.5))

)).toDF("label","features")

// Make predictions on test data using theTransformer.transform() method.

// LogisticRegression.transform will only use the 'features'column.

// Note that model2.transform() outputs a 'myProbability'column instead of the usual

// 'probability' column since we renamed thelr.probabilityCol parameter previously.

model2.transform(test)

.select("features","label","myProbability","prediction")

.collect()

.foreach{caseRow(features:Vector,label:Double,prob:Vector,prediction:Double)=>

println(s"($features, $label) -> prob=$prob, prediction=$prediction")

}

Pipeline单独的案例代码

importorg.apache.spark.ml.{Pipeline,PipelineModel}
importorg.apache.spark.ml.classification.LogisticRegression
importorg.apache.spark.ml.feature.{HashingTF,Tokenizer}
importorg.apache.spark.ml.linalg.Vector
importorg.apache.spark.sql.Row
 
// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
  (0L,"a b c d e spark",1.0),
  (1L,"b d",0.0),
  (2L,"spark f g h",1.0),
  (3L,"hadoop mapreduce",0.0)
)).toDF("id","text","label")
 
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer =newTokenizer()
  .setInputCol("text")
  .setOutputCol("words")
val hashingTF =newHashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr =newLogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.001)
val pipeline =newPipeline()
  .setStages(Array(tokenizer, hashingTF, lr))
 
// Fit the pipeline to training documents.
val model = pipeline.fit(training)
 
// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")
 
// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")
 
// And load it back in during production
val sameModel =PipelineModel.load("/tmp/spark-logistic-regression-model")
 
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
  (4L,"spark i j k"),
  (5L,"l m n"),
  (6L,"spark hadoop spark"),
  (7L,"apache hadoop")
)).toDF("id","text")
 
// Make predictions on test documents.
model.transform(test)
  .select("id","text","probability","prediction")
  .collect()
  .foreach{caseRow(id:Long, text:String, prob:Vector, prediction:Double)=>
    println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  }



数据 机器 学习 阶段 类型 多个 概念 标识 案例 模型 特征 结构 工作 检查 代码 单个 向量 多种 就是 工作流 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 湖北省超级服务器云服务器 设计数据库系统用什么语言 sql 2000压缩数据库 深圳前端软件开发收费报价表 网络技术基础专升本考试 网络技术能使我们在家上课的翻译 dsp的国产软件开发平台 做网络安全有哪些方式 上海软件开发与测试专业 sql数据库去重 泸州网络技术服务 街头篮球老是与服务器连接中断 国企网络安全协调局 网络安全技术文案高级 锡山区多功能软件开发注意事项 河北靠谱虚拟主机云空间云服务器 浙江省数据库三级 武汉亿视互联网科技有限公司 江西信息软件开发电话多少 嘉韵互联网科技有限公司 16e数据库在线查询网站 安卓登录注册到云端数据库 天津大数据库安全 网络安全发第五十九条规定 无锡安卓的软件开发 数据库查找数据源的子句代码 天津通用软件开发价格表 服务器状态异常无法连接数据库 网络安全校园日活动主题班会 db2修改数据库语句
0