sparl sql有哪些
发表于:2024-11-26 作者:千家信息网编辑
千家信息网最后更新 2024年11月26日,这篇文章给大家分享的是有关sparl sql有哪些的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1、读取json格式的文件创建DataFramejava (spark1.6
千家信息网最后更新 2024年11月26日sparl sql有哪些
这篇文章给大家分享的是有关sparl sql有哪些的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
1、读取json格式的文件创建DataFrame
java (spark1.6)
public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("javaSpark01"); SparkContext sc = new SparkContext(conf); SQLContext sqlContext = new SQLContext(sc);// Datasetdf = sqlContext.read().format("json").load("G:/idea/scala/spark02/json"); Dataset
df2 = sqlContext.read().json("G:/idea/scala/spark02/json"); df2.show(); //树形的形式显示schema信息 df2.printSchema(); //注册临时表 将DataFrame注册成临时的一张表,这张表临时注册到内存中,是逻辑上的表,不会雾化到磁盘 df2.registerTempTable("baidukt_table"); Dataset
sql = sqlContext.sql("select * from baidukt_table"); sql.show(); Dataset
sql1 = sqlContext.sql("select age,count(1) from baidukt_table group by age"); sql1.show(); }
scala(spark 1.6)
def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local").setAppName("Spark08 1.6") val sc = new SparkContext(conf) val sqlContext: SQLContext = new SQLContext(sc) val df = sqlContext.read.format("json").load("G:/idea/scala/spark02/json")// val df1 = sqlContext.read.json("G:/idea/scala/spark02/json") //显示前50行数据 df.show(50) //树形的形式显示schema信息 df.printSchema() //注册临时表 df.registerTempTable("baidukt_com_table") val result = sqlContext.sql("select age,count(1) from baidukt_com_table group by age") result.show() val result1 = sqlContext.sql("select * from baidukt_com_table") result1.show() sc.stop() }
java (spark 2.0++)
public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("Spark 2.0 ++"); SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); Datasetdf = spark.read().json("G:/idea/scala/spark02/json");// Dataset
df1 = spark.read().format("json").load("G:/idea/scala/spark02/json"); df.show(); df.printSchema(); df.createOrReplaceGlobalTempView("baidu_com_spark2"); Dataset
resut = spark.sql("select * from baidu_com_spark2"); resut.show(); spark.stop(); }
scala(spark 2.0++)
def main(args: Array[String]): Unit = { //用户的当前工作目录// val location = System.setProperties("user.dir","spark_2.0" val conf = new SparkConf().setAppName("Spark08 2.0++").setMaster("local[3]") val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() //数据导入方式 val df: DataFrame = spark.read.json("G:/idea/scala/spark02/json")// val df1: DataFrame = spark.read.format("json").load("G:/idea/scala/spark02/json") //查看表 df.show() //查看表 df.printSchema() //直接使用spark SQL进行查询 //先注册为临时表 //createOrReplaceTempView:创建临时视图,此视图的生命周期与用于创建此数据集的[SparkSession]相关联。 //createGlobalTempView:创建全局临时视图,此时图的生命周期与Spark Application绑定。 df.createOrReplaceTempView("people") val result: DataFrame = spark.sql("select * from people") result.show() spark.stop() }
2、通过json格式的RDD创建DataFrame
java(spark 1.6)
public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("jsonRDD"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDDdata = sc.parallelize(Arrays.asList( "{\"name\":\"zhangsan\",\"score\":\"100\"}", "{\"name\":\"lisi\",\"score\":\"200\"}", "{\"name\":\"wangwu\",\"score\":\"300\"}" )); Dataset df = sqlContext.read().json(data); df.show(); df.printSchema(); df.createOrReplaceTempView("baidu_com_spark2"); Dataset
resut = sqlContext.sql("select * from baidu_com_spark2"); resut.show(); sc.stop(); }
scala(spark 1.6)
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("spark10") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val data: RDD[String] = sc.parallelize(Array( "{\"name\":\"zhangsan\",\"age\":18}", "{\"name\":\"lisi\",\"age\":19}", "{\"name\":\"wangwu\",\"age\":20}" )) val df = sqlContext.read.json(data) df.show() df.printSchema() df.createOrReplaceTempView("baidukt_com_spark1.6") val result = sqlContext.sql("select * from baidukt_com_spark1.6") result.show() result.printSchema() sc.stop() }
3、非json格式的RDD创建DataFrame
3.1 通过反射的方式将非json格式的RDD转换成DataFrame(不推荐,所以不复制代码过来了)
3.2、态创建Schema将非json格式的RDD转换成DataFrame
4、读取parquet文件创建DataFrame(多次io 不推荐)
5、读取JDBC中的数据创建DataFrame(MySql为例)
java(spark 1.6)
public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("mysql"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); /** * 第一种方式读取MySql数据库表,加载为DataFrame */ Mapoptions = new HashMap (); options.put("url", "jdbc:mysql://localhost:3306/spark");//连接地址和数据库名称 options.put("driver", "com.mysql.jdbc.Driver");//驱动 options.put("user", "root");//用户名 options.put("password", "admin");//密码 options.put("dbtable", "person");//表 Dataset person = sqlContext.read().format("jdbc").options(options).load(); person.show(); //注册临时表 person.registerTempTable("person"); /** * 第二种方式读取MySql数据表加载为DataFrame */ DataFrameReader reader = sqlContext.read().format("jdbc"); reader.option("url", "jdbc:mysql://localhost:3306/spark"); reader.option("driver", "com.mysql.jdbc.Driver"); reader.option("user", "root"); reader.option("password", "admin"); reader.option("dbtable", "score"); Dataset
score = reader.load(); score.show(); score.registerTempTable("score"); Dataset
result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name"); result.show(); /** * 将DataFrame结果保存到Mysql中 */ Properties properties = new Properties(); properties.setProperty("user", "root"); properties.setProperty("password", "admin"); result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/spark", "result", properties); sc.stop(); }
scala (spark 1.6)
def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local").setAppName("mysql") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) /** * 第一种方式读取Mysql数据库表创建DF */ val options = new mutable.HashMap[String,String](); options.put("url", "jdbc:mysql://localhost:3306/spark") options.put("driver","com.mysql.jdbc.Driver") options.put("user","root") options.put("password", "admin") options.put("dbtable","person") val person = sqlContext.read.format("jdbc").options(options).load() person.show() person.registerTempTable("person") /** * 第二种方式读取Mysql数据库表创建DF */ val reader = sqlContext.read.format("jdbc") reader.option("url", "jdbc:mysql://localhost:3306/spark") reader.option("driver","com.mysql.jdbc.Driver") reader.option("user","root") reader.option("password","admin") reader.option("dbtable", "score") val score = reader.load() score.show() score.registerTempTable("score") val result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name") result.show() /** * 将数据写入到Mysql表中 */ val properties = new Properties() properties.setProperty("user", "root") properties.setProperty("password", "admin") result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/spark", "result", properties) sc.stop() }
6、读取Hive中的数据加载成DataFrame
HiveContext是SQLContext的子类,连接Hive建议使用HiveContext。
java(spark 1.6)
public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("hive"); JavaSparkContext sc = new JavaSparkContext(conf); //HiveContext是SQLContext的子类。 HiveContext hiveContext = new HiveContext(sc); hiveContext.sql("USE spark"); hiveContext.sql("DROP TABLE IF EXISTS student_infos"); //在hive中创建student_infos表 hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by '\t' "); hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos"); hiveContext.sql("DROP TABLE IF EXISTS student_scores"); hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by '\t'"); hiveContext.sql("LOAD DATA LOCAL INPATH '/root/test/student_scores INTO TABLE student_scores"); /** * 查询表生成DataFrame */ DatasetgoodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score FROM student_infos si JOIN student_scores ss ON si.name=ss.name WHERE ss.score>=80"); hiveContext.sql("DROP TABLE IF EXISTS good_student_infos"); goodStudentsDF.registerTempTable("goodstudent"); Dataset
result = hiveContext.sql("select * from goodstudent"); result.show(); result.printSchema(); /** * 将结果保存到hive表 good_student_infos */ goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos"); Row[] goodStudentRows = hiveContext.table("good_student_infos").collect(); for(Row goodStudentRow : goodStudentRows) { System.out.println(goodStudentRow); } sc.stop(); }
scala(spark 1.6)
def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("HiveSource") val sc = new SparkContext(conf) /** * HiveContext是SQLContext的子类。 */ val hiveContext = new HiveContext(sc) hiveContext.sql("use spark") hiveContext.sql("drop table if exists student_infos") hiveContext.sql("create table if not exists student_infos (name string,age int) row format delimited fields terminated by '\t'") hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos") hiveContext.sql("drop table if exists student_scores") hiveContext.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by '\t'") hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores") val df = hiveContext.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name") hiveContext.sql("drop table if exists good_student_infos") /** * 将结果写入到hive表中 */ df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos") sc.stop() }
7、自定义udf
scala(spark 1.6)
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("spark13") val spark = SparkSession.builder().config(conf).getOrCreate() //rdd转df val rdd: RDD[String] = spark.sparkContext.parallelize(Array("zhangsan","wangwu","masi")) val rowRDD: RDD[Row] = rdd.map(RowFactory.create(_)) val schema = DataTypes.createStructType(Array(StructField("name",StringType,true))) val df: DataFrame = spark.sqlContext.createDataFrame(rowRDD,schema) df.show(50) df.printSchema() df.createOrReplaceTempView("test") //自定义udf函数 函数名为StrLen,参数为String、Int String有问题,网上说需要java.lang.String类型 // spark.sqlContext.udf.register("StrLen",(s:String,i:Int)=>{s.length+i})// spark.sqlContext.udf.register("StrLen",(i:Int)=>{i})// spark.sql("select name ,StrLen(name,10) as length from test").show(20) spark.sql("select name ,StrLen(10) as length from test").show(20) }
java (spark 1.6)
public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("udf"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDDparallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu")); JavaRDD rowRDD = parallelize.map(new Function
() { private static final long serialVersionUID = 1L; @Override public Row call(String s) throws Exception { return RowFactory.create(s); } }); List fields = new ArrayList (); fields.add(DataTypes.createStructField("name", DataTypes.StringType,true)); StructType schema = DataTypes.createStructType(fields); Dataset df = sqlContext.createDataFrame(rowRDD, schema); df.registerTempTable("user"); /** * 根据UDF函数参数的个数来决定是实现哪一个UDF UDF1,UDF2。。。。UDF1xxx */ sqlContext.udf().register("StrLen", new UDF1
() { private static final long serialVersionUID = 1L; @Override public Integer call(String t1) throws Exception { return t1.length(); } }, DataTypes.IntegerType); sqlContext.sql("select name ,StrLen(name) as length from user").show(); sc.stop(); }
感谢各位的阅读!关于"sparl sql有哪些"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
数据
方式
格式
数据库
函数
子类
结果
视图
信息
内容
参数
周期
形式
文件
更多
树形
生命
用户
篇文章
推荐
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
广州电商软件开发价格
2021年军营网络安全周主题
怀旧服掉线后所有服务器都没了
软件开发公司注册条件
计算机网络技术实体是什么
倩女幽魂2服务器列表
此域名没有数据库
基因库属于下列哪种数据库
网络安全宣传图片高清大图
冬奥会网络安全保障论文
幼儿园网络安全课件
高级网络安全顾问前期做什么
服务器运维工程师转正申请书
实行网络安全保护
社交网络安全插画
违反国家网络安全 刑事拘留
资阳软件开发简介
公安大学网络安全与执法分数
浪达网络技术有限公司怎么样
服务器主机用什么接口
软件开发和嵌入式哪个好
计算机本科生去华为软件开发
在上海软件开发 工资高吗
计算机网络技术还可以叫什么
网络技术专家工资
意大利网络安全会议
绿色地狱服务器好不好
imperva数据库安全
高邑水性软件开发服务咨询报价
德国网络安全规划2020