sparl sql有哪些
发表于:2024-11-26 作者:千家信息网编辑
千家信息网最后更新 2024年11月26日,这篇文章给大家分享的是有关sparl sql有哪些的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1、读取json格式的文件创建DataFramejava (spark1.6
千家信息网最后更新 2024年11月26日sparl sql有哪些
这篇文章给大家分享的是有关sparl sql有哪些的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
1、读取json格式的文件创建DataFrame
java (spark1.6)
public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("javaSpark01"); SparkContext sc = new SparkContext(conf); SQLContext sqlContext = new SQLContext(sc);// Datasetdf = sqlContext.read().format("json").load("G:/idea/scala/spark02/json"); Dataset
df2 = sqlContext.read().json("G:/idea/scala/spark02/json"); df2.show(); //树形的形式显示schema信息 df2.printSchema(); //注册临时表 将DataFrame注册成临时的一张表,这张表临时注册到内存中,是逻辑上的表,不会雾化到磁盘 df2.registerTempTable("baidukt_table"); Dataset
sql = sqlContext.sql("select * from baidukt_table"); sql.show(); Dataset
sql1 = sqlContext.sql("select age,count(1) from baidukt_table group by age"); sql1.show(); }
scala(spark 1.6)
def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local").setAppName("Spark08 1.6") val sc = new SparkContext(conf) val sqlContext: SQLContext = new SQLContext(sc) val df = sqlContext.read.format("json").load("G:/idea/scala/spark02/json")// val df1 = sqlContext.read.json("G:/idea/scala/spark02/json") //显示前50行数据 df.show(50) //树形的形式显示schema信息 df.printSchema() //注册临时表 df.registerTempTable("baidukt_com_table") val result = sqlContext.sql("select age,count(1) from baidukt_com_table group by age") result.show() val result1 = sqlContext.sql("select * from baidukt_com_table") result1.show() sc.stop() }
java (spark 2.0++)
public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("Spark 2.0 ++"); SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); Datasetdf = spark.read().json("G:/idea/scala/spark02/json");// Dataset
df1 = spark.read().format("json").load("G:/idea/scala/spark02/json"); df.show(); df.printSchema(); df.createOrReplaceGlobalTempView("baidu_com_spark2"); Dataset
resut = spark.sql("select * from baidu_com_spark2"); resut.show(); spark.stop(); }
scala(spark 2.0++)
def main(args: Array[String]): Unit = { //用户的当前工作目录// val location = System.setProperties("user.dir","spark_2.0" val conf = new SparkConf().setAppName("Spark08 2.0++").setMaster("local[3]") val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() //数据导入方式 val df: DataFrame = spark.read.json("G:/idea/scala/spark02/json")// val df1: DataFrame = spark.read.format("json").load("G:/idea/scala/spark02/json") //查看表 df.show() //查看表 df.printSchema() //直接使用spark SQL进行查询 //先注册为临时表 //createOrReplaceTempView:创建临时视图,此视图的生命周期与用于创建此数据集的[SparkSession]相关联。 //createGlobalTempView:创建全局临时视图,此时图的生命周期与Spark Application绑定。 df.createOrReplaceTempView("people") val result: DataFrame = spark.sql("select * from people") result.show() spark.stop() }
2、通过json格式的RDD创建DataFrame
java(spark 1.6)
public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("jsonRDD"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDDdata = sc.parallelize(Arrays.asList( "{\"name\":\"zhangsan\",\"score\":\"100\"}", "{\"name\":\"lisi\",\"score\":\"200\"}", "{\"name\":\"wangwu\",\"score\":\"300\"}" )); Dataset df = sqlContext.read().json(data); df.show(); df.printSchema(); df.createOrReplaceTempView("baidu_com_spark2"); Dataset
resut = sqlContext.sql("select * from baidu_com_spark2"); resut.show(); sc.stop(); }
scala(spark 1.6)
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("spark10") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val data: RDD[String] = sc.parallelize(Array( "{\"name\":\"zhangsan\",\"age\":18}", "{\"name\":\"lisi\",\"age\":19}", "{\"name\":\"wangwu\",\"age\":20}" )) val df = sqlContext.read.json(data) df.show() df.printSchema() df.createOrReplaceTempView("baidukt_com_spark1.6") val result = sqlContext.sql("select * from baidukt_com_spark1.6") result.show() result.printSchema() sc.stop() }
3、非json格式的RDD创建DataFrame
3.1 通过反射的方式将非json格式的RDD转换成DataFrame(不推荐,所以不复制代码过来了)
3.2、态创建Schema将非json格式的RDD转换成DataFrame
4、读取parquet文件创建DataFrame(多次io 不推荐)
5、读取JDBC中的数据创建DataFrame(MySql为例)
java(spark 1.6)
public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("mysql"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); /** * 第一种方式读取MySql数据库表,加载为DataFrame */ Mapoptions = new HashMap (); options.put("url", "jdbc:mysql://localhost:3306/spark");//连接地址和数据库名称 options.put("driver", "com.mysql.jdbc.Driver");//驱动 options.put("user", "root");//用户名 options.put("password", "admin");//密码 options.put("dbtable", "person");//表 Dataset person = sqlContext.read().format("jdbc").options(options).load(); person.show(); //注册临时表 person.registerTempTable("person"); /** * 第二种方式读取MySql数据表加载为DataFrame */ DataFrameReader reader = sqlContext.read().format("jdbc"); reader.option("url", "jdbc:mysql://localhost:3306/spark"); reader.option("driver", "com.mysql.jdbc.Driver"); reader.option("user", "root"); reader.option("password", "admin"); reader.option("dbtable", "score"); Dataset
score = reader.load(); score.show(); score.registerTempTable("score"); Dataset
result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name"); result.show(); /** * 将DataFrame结果保存到Mysql中 */ Properties properties = new Properties(); properties.setProperty("user", "root"); properties.setProperty("password", "admin"); result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/spark", "result", properties); sc.stop(); }
scala (spark 1.6)
def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local").setAppName("mysql") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) /** * 第一种方式读取Mysql数据库表创建DF */ val options = new mutable.HashMap[String,String](); options.put("url", "jdbc:mysql://localhost:3306/spark") options.put("driver","com.mysql.jdbc.Driver") options.put("user","root") options.put("password", "admin") options.put("dbtable","person") val person = sqlContext.read.format("jdbc").options(options).load() person.show() person.registerTempTable("person") /** * 第二种方式读取Mysql数据库表创建DF */ val reader = sqlContext.read.format("jdbc") reader.option("url", "jdbc:mysql://localhost:3306/spark") reader.option("driver","com.mysql.jdbc.Driver") reader.option("user","root") reader.option("password","admin") reader.option("dbtable", "score") val score = reader.load() score.show() score.registerTempTable("score") val result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name") result.show() /** * 将数据写入到Mysql表中 */ val properties = new Properties() properties.setProperty("user", "root") properties.setProperty("password", "admin") result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/spark", "result", properties) sc.stop() }
6、读取Hive中的数据加载成DataFrame
HiveContext是SQLContext的子类,连接Hive建议使用HiveContext。
java(spark 1.6)
public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("hive"); JavaSparkContext sc = new JavaSparkContext(conf); //HiveContext是SQLContext的子类。 HiveContext hiveContext = new HiveContext(sc); hiveContext.sql("USE spark"); hiveContext.sql("DROP TABLE IF EXISTS student_infos"); //在hive中创建student_infos表 hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by '\t' "); hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos"); hiveContext.sql("DROP TABLE IF EXISTS student_scores"); hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by '\t'"); hiveContext.sql("LOAD DATA LOCAL INPATH '/root/test/student_scores INTO TABLE student_scores"); /** * 查询表生成DataFrame */ DatasetgoodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score FROM student_infos si JOIN student_scores ss ON si.name=ss.name WHERE ss.score>=80"); hiveContext.sql("DROP TABLE IF EXISTS good_student_infos"); goodStudentsDF.registerTempTable("goodstudent"); Dataset
result = hiveContext.sql("select * from goodstudent"); result.show(); result.printSchema(); /** * 将结果保存到hive表 good_student_infos */ goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos"); Row[] goodStudentRows = hiveContext.table("good_student_infos").collect(); for(Row goodStudentRow : goodStudentRows) { System.out.println(goodStudentRow); } sc.stop(); }
scala(spark 1.6)
def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("HiveSource") val sc = new SparkContext(conf) /** * HiveContext是SQLContext的子类。 */ val hiveContext = new HiveContext(sc) hiveContext.sql("use spark") hiveContext.sql("drop table if exists student_infos") hiveContext.sql("create table if not exists student_infos (name string,age int) row format delimited fields terminated by '\t'") hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos") hiveContext.sql("drop table if exists student_scores") hiveContext.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by '\t'") hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores") val df = hiveContext.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name") hiveContext.sql("drop table if exists good_student_infos") /** * 将结果写入到hive表中 */ df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos") sc.stop() }
7、自定义udf
scala(spark 1.6)
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("spark13") val spark = SparkSession.builder().config(conf).getOrCreate() //rdd转df val rdd: RDD[String] = spark.sparkContext.parallelize(Array("zhangsan","wangwu","masi")) val rowRDD: RDD[Row] = rdd.map(RowFactory.create(_)) val schema = DataTypes.createStructType(Array(StructField("name",StringType,true))) val df: DataFrame = spark.sqlContext.createDataFrame(rowRDD,schema) df.show(50) df.printSchema() df.createOrReplaceTempView("test") //自定义udf函数 函数名为StrLen,参数为String、Int String有问题,网上说需要java.lang.String类型 // spark.sqlContext.udf.register("StrLen",(s:String,i:Int)=>{s.length+i})// spark.sqlContext.udf.register("StrLen",(i:Int)=>{i})// spark.sql("select name ,StrLen(name,10) as length from test").show(20) spark.sql("select name ,StrLen(10) as length from test").show(20) }
java (spark 1.6)
public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("udf"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDDparallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu")); JavaRDD rowRDD = parallelize.map(new Function
() { private static final long serialVersionUID = 1L; @Override public Row call(String s) throws Exception { return RowFactory.create(s); } }); List fields = new ArrayList (); fields.add(DataTypes.createStructField("name", DataTypes.StringType,true)); StructType schema = DataTypes.createStructType(fields); Dataset df = sqlContext.createDataFrame(rowRDD, schema); df.registerTempTable("user"); /** * 根据UDF函数参数的个数来决定是实现哪一个UDF UDF1,UDF2。。。。UDF1xxx */ sqlContext.udf().register("StrLen", new UDF1
() { private static final long serialVersionUID = 1L; @Override public Integer call(String t1) throws Exception { return t1.length(); } }, DataTypes.IntegerType); sqlContext.sql("select name ,StrLen(name) as length from user").show(); sc.stop(); }
感谢各位的阅读!关于"sparl sql有哪些"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
数据
方式
格式
数据库
函数
子类
结果
视图
信息
内容
参数
周期
形式
文件
更多
树形
生命
用户
篇文章
推荐
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全在我身边内容
xp数据库关闭
人为疏忽造成网络安全
吉林软件开发自学课程
服务器节能措施
mysql数据库免备案
php上传图片到服务器
局域网禁用服务器
网络安全的反思
灵活用工平台中服务器要求
服务器专利大全
电子地图软件开发
上海美品网络技术有限公司
网络安全技术与实践书本习题练习
代谢组学常用数据库
广安软件开发优化价格
考数据库系统概论的专业
sql数据库条件更改
邯郸交友软件开发公司
软件开发找工作室靠谱吗
云服务器优惠活动
明日之后忘了自己的服务器怎么办
675w服务器电源
2018甘肃网络安全宣传
u盘数据库怎么导出
ps4登录账号无法连接服务器
网络安全学科包含
盈峰网络技术有限公司面试
基础研究物性数据库
新能源软件开发职责