千家信息网

sparl sql有哪些

发表于:2024-11-26 作者:千家信息网编辑
千家信息网最后更新 2024年11月26日,这篇文章给大家分享的是有关sparl sql有哪些的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1、读取json格式的文件创建DataFramejava (spark1.6
千家信息网最后更新 2024年11月26日sparl sql有哪些

这篇文章给大家分享的是有关sparl sql有哪些的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

1、读取json格式的文件创建DataFrame

java (spark1.6)

public static void main(String[] args) {        SparkConf conf = new SparkConf();        conf.setMaster("local").setAppName("javaSpark01");        SparkContext sc = new SparkContext(conf);        SQLContext sqlContext = new SQLContext(sc);//        Dataset df = sqlContext.read().format("json").load("G:/idea/scala/spark02/json");        Dataset df2 = sqlContext.read().json("G:/idea/scala/spark02/json");        df2.show();        //树形的形式显示schema信息        df2.printSchema();        //注册临时表 将DataFrame注册成临时的一张表,这张表临时注册到内存中,是逻辑上的表,不会雾化到磁盘        df2.registerTempTable("baidukt_table");        Dataset sql = sqlContext.sql("select * from baidukt_table");        sql.show();        Dataset sql1 = sqlContext.sql("select age,count(1) from baidukt_table group by age");        sql1.show(); }

scala(spark 1.6)

def main(args: Array[String]): Unit = {    val conf = new SparkConf()    conf.setMaster("local").setAppName("Spark08 1.6")     val sc = new SparkContext(conf)    val sqlContext: SQLContext = new SQLContext(sc)    val df = sqlContext.read.format("json").load("G:/idea/scala/spark02/json")//    val df1 = sqlContext.read.json("G:/idea/scala/spark02/json")    //显示前50行数据    df.show(50)    //树形的形式显示schema信息    df.printSchema()    //注册临时表    df.registerTempTable("baidukt_com_table")    val result = sqlContext.sql("select age,count(1) from baidukt_com_table group by age")    result.show()    val result1 = sqlContext.sql("select * from baidukt_com_table")    result1.show()    sc.stop()  }

java (spark 2.0++)

public static void main(String[] args) {        SparkConf conf = new SparkConf().setMaster("local").setAppName("Spark 2.0 ++");        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();        Dataset df = spark.read().json("G:/idea/scala/spark02/json");//        Dataset df1 = spark.read().format("json").load("G:/idea/scala/spark02/json");        df.show();        df.printSchema();        df.createOrReplaceGlobalTempView("baidu_com_spark2");        Dataset resut = spark.sql("select * from baidu_com_spark2");        resut.show();        spark.stop();    }

scala(spark 2.0++)

  def main(args: Array[String]): Unit = {    //用户的当前工作目录//    val  location = System.setProperties("user.dir","spark_2.0"    val conf = new SparkConf().setAppName("Spark08 2.0++").setMaster("local[3]")    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()    //数据导入方式    val df: DataFrame = spark.read.json("G:/idea/scala/spark02/json")//    val df1: DataFrame = spark.read.format("json").load("G:/idea/scala/spark02/json")    //查看表    df.show()    //查看表    df.printSchema()    //直接使用spark SQL进行查询    //先注册为临时表    //createOrReplaceTempView:创建临时视图,此视图的生命周期与用于创建此数据集的[SparkSession]相关联。    //createGlobalTempView:创建全局临时视图,此时图的生命周期与Spark Application绑定。    df.createOrReplaceTempView("people")    val result: DataFrame = spark.sql("select * from people")    result.show()    spark.stop()  }

2、通过json格式的RDD创建DataFrame

java(spark 1.6)

public static void main(String[] args) {        SparkConf conf = new SparkConf();        conf.setMaster("local").setAppName("jsonRDD");        JavaSparkContext sc = new JavaSparkContext(conf);        SQLContext sqlContext = new SQLContext(sc);        JavaRDD data = sc.parallelize(Arrays.asList(                "{\"name\":\"zhangsan\",\"score\":\"100\"}",                "{\"name\":\"lisi\",\"score\":\"200\"}",                "{\"name\":\"wangwu\",\"score\":\"300\"}"        ));        Dataset df = sqlContext.read().json(data);        df.show();        df.printSchema();        df.createOrReplaceTempView("baidu_com_spark2");        Dataset resut = sqlContext.sql("select * from baidu_com_spark2");        resut.show();        sc.stop();    }

scala(spark 1.6)

def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local").setAppName("spark10")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)    val data: RDD[String] = sc.parallelize(Array(      "{\"name\":\"zhangsan\",\"age\":18}",      "{\"name\":\"lisi\",\"age\":19}",      "{\"name\":\"wangwu\",\"age\":20}"    ))    val df = sqlContext.read.json(data)    df.show()    df.printSchema()    df.createOrReplaceTempView("baidukt_com_spark1.6")    val result = sqlContext.sql("select * from baidukt_com_spark1.6")    result.show()    result.printSchema()    sc.stop()  }

3、非json格式的RDD创建DataFrame

3.1 通过反射的方式将非json格式的RDD转换成DataFrame(不推荐,所以不复制代码过来了)

3.2、态创建Schema将非json格式的RDD转换成DataFrame

4、读取parquet文件创建DataFrame(多次io 不推荐)

5、读取JDBC中的数据创建DataFrame(MySql为例)

java(spark 1.6)

   public static void main(String[] args) {        SparkConf conf = new SparkConf();        conf.setMaster("local").setAppName("mysql");        JavaSparkContext sc = new JavaSparkContext(conf);        SQLContext sqlContext = new SQLContext(sc);        /**         * 第一种方式读取MySql数据库表,加载为DataFrame         */        Map options = new HashMap();        options.put("url", "jdbc:mysql://localhost:3306/spark");//连接地址和数据库名称        options.put("driver", "com.mysql.jdbc.Driver");//驱动        options.put("user", "root");//用户名        options.put("password", "admin");//密码        options.put("dbtable", "person");//表        Dataset person = sqlContext.read().format("jdbc").options(options).load();        person.show();        //注册临时表        person.registerTempTable("person");        /**         * 第二种方式读取MySql数据表加载为DataFrame         */        DataFrameReader reader = sqlContext.read().format("jdbc");        reader.option("url", "jdbc:mysql://localhost:3306/spark");        reader.option("driver", "com.mysql.jdbc.Driver");        reader.option("user", "root");        reader.option("password", "admin");        reader.option("dbtable", "score");        Dataset score = reader.load();        score.show();        score.registerTempTable("score");        Dataset result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name");        result.show();        /**         * 将DataFrame结果保存到Mysql中         */        Properties properties = new Properties();        properties.setProperty("user", "root");        properties.setProperty("password", "admin");        result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/spark", "result", properties);        sc.stop();    }

scala (spark 1.6)

  def main(args: Array[String]): Unit = {    val conf = new SparkConf()    conf.setMaster("local").setAppName("mysql")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)    /**      * 第一种方式读取Mysql数据库表创建DF      */    val options = new mutable.HashMap[String,String]();    options.put("url", "jdbc:mysql://localhost:3306/spark")    options.put("driver","com.mysql.jdbc.Driver")    options.put("user","root")    options.put("password", "admin")    options.put("dbtable","person")    val person = sqlContext.read.format("jdbc").options(options).load()    person.show()    person.registerTempTable("person")    /**      * 第二种方式读取Mysql数据库表创建DF      */    val reader = sqlContext.read.format("jdbc")    reader.option("url", "jdbc:mysql://localhost:3306/spark")    reader.option("driver","com.mysql.jdbc.Driver")    reader.option("user","root")    reader.option("password","admin")    reader.option("dbtable", "score")    val score = reader.load()    score.show()    score.registerTempTable("score")    val result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name")    result.show()    /**      * 将数据写入到Mysql表中      */    val properties = new Properties()    properties.setProperty("user", "root")    properties.setProperty("password", "admin")    result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/spark", "result", properties)    sc.stop()  }

6、读取Hive中的数据加载成DataFrame

HiveContext是SQLContext的子类,连接Hive建议使用HiveContext。

java(spark 1.6)

public static void main(String[] args) {        SparkConf conf = new SparkConf();        conf.setAppName("hive");        JavaSparkContext sc = new JavaSparkContext(conf);        //HiveContext是SQLContext的子类。        HiveContext hiveContext = new HiveContext(sc);        hiveContext.sql("USE spark");        hiveContext.sql("DROP TABLE IF EXISTS student_infos");        //在hive中创建student_infos表        hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by '\t' ");        hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos");        hiveContext.sql("DROP TABLE IF EXISTS student_scores");        hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by '\t'");        hiveContext.sql("LOAD DATA LOCAL INPATH '/root/test/student_scores INTO TABLE student_scores");        /**         * 查询表生成DataFrame         */        Dataset goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score FROM student_infos si JOIN student_scores ss ON si.name=ss.name WHERE ss.score>=80");        hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");        goodStudentsDF.registerTempTable("goodstudent");        Dataset result = hiveContext.sql("select * from goodstudent");        result.show();        result.printSchema();        /**         * 将结果保存到hive表 good_student_infos         */        goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");        Row[] goodStudentRows = hiveContext.table("good_student_infos").collect();        for(Row goodStudentRow : goodStudentRows) {            System.out.println(goodStudentRow);        }        sc.stop();    }

scala(spark 1.6)

  def main(args: Array[String]): Unit = {    val conf = new SparkConf()    conf.setAppName("HiveSource")    val sc = new SparkContext(conf)    /**      * HiveContext是SQLContext的子类。      */    val hiveContext = new HiveContext(sc)    hiveContext.sql("use spark")    hiveContext.sql("drop table if exists student_infos")    hiveContext.sql("create table if not exists student_infos (name string,age int) row format  delimited fields terminated by '\t'")    hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos")    hiveContext.sql("drop table if exists student_scores")    hiveContext.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by '\t'")    hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores")    val df = hiveContext.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")    hiveContext.sql("drop table if exists good_student_infos")    /**      * 将结果写入到hive表中      */    df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")    sc.stop()  }

7、自定义udf

scala(spark 1.6)

 def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local").setAppName("spark13")    val spark = SparkSession.builder().config(conf).getOrCreate()    //rdd转df    val rdd: RDD[String] = spark.sparkContext.parallelize(Array("zhangsan","wangwu","masi"))    val rowRDD: RDD[Row] = rdd.map(RowFactory.create(_))    val schema = DataTypes.createStructType(Array(StructField("name",StringType,true)))    val df: DataFrame = spark.sqlContext.createDataFrame(rowRDD,schema)    df.show(50)        df.printSchema()    df.createOrReplaceTempView("test")    //自定义udf函数 函数名为StrLen,参数为String、Int String有问题,网上说需要java.lang.String类型 //    spark.sqlContext.udf.register("StrLen",(s:String,i:Int)=>{s.length+i})//    spark.sqlContext.udf.register("StrLen",(i:Int)=>{i})//    spark.sql("select name ,StrLen(name,10) as length from test").show(20)    spark.sql("select name ,StrLen(10) as length from test").show(20)  }

java (spark 1.6)

  public static void main(String[] args) {        SparkConf conf = new SparkConf();        conf.setMaster("local");        conf.setAppName("udf");        JavaSparkContext sc = new JavaSparkContext(conf);        SQLContext sqlContext = new SQLContext(sc);        JavaRDD parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu"));        JavaRDD rowRDD = parallelize.map(new Function() {            private static final long serialVersionUID = 1L;            @Override            public Row call(String s) throws Exception {                return RowFactory.create(s);            }        });        List fields = new ArrayList();        fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));        StructType schema = DataTypes.createStructType(fields);        Dataset df = sqlContext.createDataFrame(rowRDD, schema);        df.registerTempTable("user");        /**         * 根据UDF函数参数的个数来决定是实现哪一个UDF  UDF1,UDF2。。。。UDF1xxx         */        sqlContext.udf().register("StrLen", new UDF1() {            private static final long serialVersionUID = 1L;            @Override            public Integer call(String t1) throws Exception {                return t1.length();            }        }, DataTypes.IntegerType);        sqlContext.sql("select name ,StrLen(name) as length from user").show();        sc.stop();    }

感谢各位的阅读!关于"sparl sql有哪些"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

数据 方式 格式 数据库 函数 子类 结果 视图 信息 内容 参数 周期 形式 文件 更多 树形 生命 用户 篇文章 推荐 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 广州电商软件开发价格 2021年军营网络安全周主题 怀旧服掉线后所有服务器都没了 软件开发公司注册条件 计算机网络技术实体是什么 倩女幽魂2服务器列表 此域名没有数据库 基因库属于下列哪种数据库 网络安全宣传图片高清大图 冬奥会网络安全保障论文 幼儿园网络安全课件 高级网络安全顾问前期做什么 服务器运维工程师转正申请书 实行网络安全保护 社交网络安全插画 违反国家网络安全 刑事拘留 资阳软件开发简介 公安大学网络安全与执法分数 浪达网络技术有限公司怎么样 服务器主机用什么接口 软件开发和嵌入式哪个好 计算机本科生去华为软件开发 在上海软件开发 工资高吗 计算机网络技术还可以叫什么 网络技术专家工资 意大利网络安全会议 绿色地狱服务器好不好 imperva数据库安全 高邑水性软件开发服务咨询报价 德国网络安全规划2020
0