千家信息网

spark sql在scala中使用的方式有哪些

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,这篇文章主要介绍"spark sql在scala中使用的方式有哪些",在日常操作中,相信很多人在spark sql在scala中使用的方式有哪些问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作
千家信息网最后更新 2025年01月24日spark sql在scala中使用的方式有哪些

这篇文章主要介绍"spark sql在scala中使用的方式有哪些",在日常操作中,相信很多人在spark sql在scala中使用的方式有哪些问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"spark sql在scala中使用的方式有哪些"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

package hgs.spark.sqlimport org.apache.spark.SparkConfimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.sql.SQLContextimport org.apache.spark.sql.SQLImplicitsimport org.apache.spark.sql.types.StructTypeimport org.apache.spark.sql.types.StructFieldimport org.apache.spark.sql.types.StringTypeimport org.apache.spark.sql.types.IntegerTypeimport org.apache.spark.sql.Row//第一种方法创建dataframeobject SqlTest1 {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("sqltest1").setMaster("local")    val context = new SparkContext(conf)    val sqlContext = new SQLContext(context)        val rdd = context.textFile("d:\\person",1)    val rdd2 = rdd.map(x=>{val t = x.split(" ");person(t(0).toInt,t(1),t(2).toInt)})    //第一种方法创建dataframe,在这里需要导入隐式转换    import sqlContext.implicits._          val persondf = rdd2.toDF()      //这个方法在2.1.0里面被废除    //persondf.registerTempTable("person")    //使用该函数代替    persondf.createOrReplaceTempView("person")    val result = sqlContext.sql("select * from person order by age desc")    //打印查询的结果    result.show()    //或者将结果保存到文件    result.write.json("d://personselect")       context.stop()  }}case class person(id:Int,name:String,age:Int)//第二种方法创建dataframe//3.1.2.        通过StructType直接指定Schemaobject SqlTest2{  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("sqltest2").setMaster("local")    val context = new SparkContext(conf)    val sqlContext = new SQLContext(context)        val rdd = context.textFile("d:\\person",1)        //第一种方法创建dataframe,在这里需要导入隐式转换    //创建schema,即一个映射关系       val personShcema = StructType(    List(        //下面为一个列的描述,分别为 列名,数据类型,是否为空        StructField("id",IntegerType,true),        StructField("name",StringType,true),        StructField("age",IntegerType,true)     )        )        val rdd2 = rdd.map(x=>{val t = x.split(" ");Row(t(0).toInt,t(1),t(2).toInt)})    //通过这种方式创建dataframeval persondf = sqlContext.createDataFrame(rdd2, personShcema)    //将dataframe映射为一个临时的表    persondf.createOrReplaceTempView("person")    //查询数据展示    sqlContext.sql("select * from person order by age desc").show()    context.stop()/*  查询出的数据 *  +---+----+---+    | id|name|age|    +---+----+---+    |  1| hgs| 26|    |  3|  zz| 25|    |  2|  wd| 24|    |  4|  cm| 24|    +---+----+---+    */      }}
一些笔记:checkpoint:        将rdd中间过程持久化到hdfs上面,如果某个rdd失败,则从hdfs回复,这样代价较小        sc.setCheckpointDir("hdfs dir or other fs dir "),建议将RDD cache之后再        checkpoin这样将减少一次运算直接从内存中将RDD进行checkpoin        但是这样之前依赖的RDD也会被丢弃RDD Objects构建DAG--->DAGScheduler(TaskSet(每个Task在每个excutor上&&切分stage,并提价stage))    ------>TaskScheduler(Task&&提交task,)------>Worker  (执行task)stage:根据依赖关系区分stage,当遇到一个宽依赖(节点之间交换数据)的时候划分一个stage        其中窄依赖:父RDD的分区数据只传向一个子RDD分区,而宽依赖则是父RDD的分区数据会传向多个子RDD的或者多个分区        spark SQL:处理结构化的数据        DataFrames:与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,                除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持                嵌套数据类型(struct、array和map)。从API易用性的角度上 看,DataFrame API提供的是一套高层                的关系操作,比函数式的RDD API要更加友好,门槛更低。由于与R和Pandas的DataFrame类似,                Spark DataFrame很好地继承了传统单机数据分析的开发体验        创建DataFrame: 将数据映射为class,RDD.toDF         通过sql查询,将df注册为一个表1. df.registerTempTable("test") sqlContext.sql("select * from test").show                                                                  2.通过StructType定义:StrutType(List())hive 3.0.0 与spark        1.将hive-site.xml hdfs-site.xml  core-site.xml复制到spark的conf文件夹下 ,将mysql驱动放到spark的jars文件夹下面        2.在hive中的语句在spark-sql中完全适用:                create table person(id int,name string,age int) row format delimited fields terminated by ' ';                load data inpath 'hdfs://bigdata00:9000/person' into table person;                select * from person;                数据如下:                        1       hgs     26                        2       wd      24                        3       zz      25                        4       cm      24        3.在spark-sql console交互中会打印很多的INFO级别的信息,很烦人,解决办法是                在conf文件夹下:                   mv log4j.properties.template  log4j.properties                        将log4j.rootCategory=INFO, console 修改为log4j.rootCategory=WARN, console

到此,关于"spark sql在scala中使用的方式有哪些"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

数据 方法 方式 文件 学习 查询 文件夹 传统 信息 函数 多个 更多 类型 结构 结果 帮助 烦人 接下来 个子 中将 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 各大网站数据库有哪些 ACE连接数据库 南瑞信通网络安全管理装置 河北应用软件开发哪家便宜 战地五服务器图标 廊坊有没有软件开发工程师学校 iapp怎样接入服务器 腾讯云数据库产品架构师 超市管理系统数据库设计 江苏省网络安全技能大赛冠军 软件开发期末考试 WEB服务器IIS的配置 软件测试转软件开发好转嘛 服务器安全狗监听端口 深圳手信网络技术有限公司 江阴进口网络技术创新服务 三门峡日报网络安全 客户端怎么显示数据库的数据 odbc连接数据库不显示 数据库字段声明个数 保山行业专业软件开发 天津统一软件开发服务郑重承诺 网络安全监督检查整改通知书 北方电信服务器换号 软件开发加班厉害吗 移动软件开发公司什么时候放假 云服务器租用服务价格 方舟端游加入服务器连接超时 客户端怎么显示数据库的数据 java 跨数据库查询
0