千家信息网

sparl sql有哪些

发表于:2024-11-26 作者:千家信息网编辑
千家信息网最后更新 2024年11月26日,这篇文章给大家分享的是有关sparl sql有哪些的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1、读取json格式的文件创建DataFramejava (spark1.6
千家信息网最后更新 2024年11月26日sparl sql有哪些

这篇文章给大家分享的是有关sparl sql有哪些的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

1、读取json格式的文件创建DataFrame

java (spark1.6)

public static void main(String[] args) {        SparkConf conf = new SparkConf();        conf.setMaster("local").setAppName("javaSpark01");        SparkContext sc = new SparkContext(conf);        SQLContext sqlContext = new SQLContext(sc);//        Dataset df = sqlContext.read().format("json").load("G:/idea/scala/spark02/json");        Dataset df2 = sqlContext.read().json("G:/idea/scala/spark02/json");        df2.show();        //树形的形式显示schema信息        df2.printSchema();        //注册临时表 将DataFrame注册成临时的一张表,这张表临时注册到内存中,是逻辑上的表,不会雾化到磁盘        df2.registerTempTable("baidukt_table");        Dataset sql = sqlContext.sql("select * from baidukt_table");        sql.show();        Dataset sql1 = sqlContext.sql("select age,count(1) from baidukt_table group by age");        sql1.show(); }

scala(spark 1.6)

def main(args: Array[String]): Unit = {    val conf = new SparkConf()    conf.setMaster("local").setAppName("Spark08 1.6")     val sc = new SparkContext(conf)    val sqlContext: SQLContext = new SQLContext(sc)    val df = sqlContext.read.format("json").load("G:/idea/scala/spark02/json")//    val df1 = sqlContext.read.json("G:/idea/scala/spark02/json")    //显示前50行数据    df.show(50)    //树形的形式显示schema信息    df.printSchema()    //注册临时表    df.registerTempTable("baidukt_com_table")    val result = sqlContext.sql("select age,count(1) from baidukt_com_table group by age")    result.show()    val result1 = sqlContext.sql("select * from baidukt_com_table")    result1.show()    sc.stop()  }

java (spark 2.0++)

public static void main(String[] args) {        SparkConf conf = new SparkConf().setMaster("local").setAppName("Spark 2.0 ++");        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();        Dataset df = spark.read().json("G:/idea/scala/spark02/json");//        Dataset df1 = spark.read().format("json").load("G:/idea/scala/spark02/json");        df.show();        df.printSchema();        df.createOrReplaceGlobalTempView("baidu_com_spark2");        Dataset resut = spark.sql("select * from baidu_com_spark2");        resut.show();        spark.stop();    }

scala(spark 2.0++)

  def main(args: Array[String]): Unit = {    //用户的当前工作目录//    val  location = System.setProperties("user.dir","spark_2.0"    val conf = new SparkConf().setAppName("Spark08 2.0++").setMaster("local[3]")    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()    //数据导入方式    val df: DataFrame = spark.read.json("G:/idea/scala/spark02/json")//    val df1: DataFrame = spark.read.format("json").load("G:/idea/scala/spark02/json")    //查看表    df.show()    //查看表    df.printSchema()    //直接使用spark SQL进行查询    //先注册为临时表    //createOrReplaceTempView:创建临时视图,此视图的生命周期与用于创建此数据集的[SparkSession]相关联。    //createGlobalTempView:创建全局临时视图,此时图的生命周期与Spark Application绑定。    df.createOrReplaceTempView("people")    val result: DataFrame = spark.sql("select * from people")    result.show()    spark.stop()  }

2、通过json格式的RDD创建DataFrame

java(spark 1.6)

public static void main(String[] args) {        SparkConf conf = new SparkConf();        conf.setMaster("local").setAppName("jsonRDD");        JavaSparkContext sc = new JavaSparkContext(conf);        SQLContext sqlContext = new SQLContext(sc);        JavaRDD data = sc.parallelize(Arrays.asList(                "{\"name\":\"zhangsan\",\"score\":\"100\"}",                "{\"name\":\"lisi\",\"score\":\"200\"}",                "{\"name\":\"wangwu\",\"score\":\"300\"}"        ));        Dataset df = sqlContext.read().json(data);        df.show();        df.printSchema();        df.createOrReplaceTempView("baidu_com_spark2");        Dataset resut = sqlContext.sql("select * from baidu_com_spark2");        resut.show();        sc.stop();    }

scala(spark 1.6)

def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local").setAppName("spark10")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)    val data: RDD[String] = sc.parallelize(Array(      "{\"name\":\"zhangsan\",\"age\":18}",      "{\"name\":\"lisi\",\"age\":19}",      "{\"name\":\"wangwu\",\"age\":20}"    ))    val df = sqlContext.read.json(data)    df.show()    df.printSchema()    df.createOrReplaceTempView("baidukt_com_spark1.6")    val result = sqlContext.sql("select * from baidukt_com_spark1.6")    result.show()    result.printSchema()    sc.stop()  }

3、非json格式的RDD创建DataFrame

3.1 通过反射的方式将非json格式的RDD转换成DataFrame(不推荐,所以不复制代码过来了)

3.2、态创建Schema将非json格式的RDD转换成DataFrame

4、读取parquet文件创建DataFrame(多次io 不推荐)

5、读取JDBC中的数据创建DataFrame(MySql为例)

java(spark 1.6)

   public static void main(String[] args) {        SparkConf conf = new SparkConf();        conf.setMaster("local").setAppName("mysql");        JavaSparkContext sc = new JavaSparkContext(conf);        SQLContext sqlContext = new SQLContext(sc);        /**         * 第一种方式读取MySql数据库表,加载为DataFrame         */        Map options = new HashMap();        options.put("url", "jdbc:mysql://localhost:3306/spark");//连接地址和数据库名称        options.put("driver", "com.mysql.jdbc.Driver");//驱动        options.put("user", "root");//用户名        options.put("password", "admin");//密码        options.put("dbtable", "person");//表        Dataset person = sqlContext.read().format("jdbc").options(options).load();        person.show();        //注册临时表        person.registerTempTable("person");        /**         * 第二种方式读取MySql数据表加载为DataFrame         */        DataFrameReader reader = sqlContext.read().format("jdbc");        reader.option("url", "jdbc:mysql://localhost:3306/spark");        reader.option("driver", "com.mysql.jdbc.Driver");        reader.option("user", "root");        reader.option("password", "admin");        reader.option("dbtable", "score");        Dataset score = reader.load();        score.show();        score.registerTempTable("score");        Dataset result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name");        result.show();        /**         * 将DataFrame结果保存到Mysql中         */        Properties properties = new Properties();        properties.setProperty("user", "root");        properties.setProperty("password", "admin");        result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/spark", "result", properties);        sc.stop();    }

scala (spark 1.6)

  def main(args: Array[String]): Unit = {    val conf = new SparkConf()    conf.setMaster("local").setAppName("mysql")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)    /**      * 第一种方式读取Mysql数据库表创建DF      */    val options = new mutable.HashMap[String,String]();    options.put("url", "jdbc:mysql://localhost:3306/spark")    options.put("driver","com.mysql.jdbc.Driver")    options.put("user","root")    options.put("password", "admin")    options.put("dbtable","person")    val person = sqlContext.read.format("jdbc").options(options).load()    person.show()    person.registerTempTable("person")    /**      * 第二种方式读取Mysql数据库表创建DF      */    val reader = sqlContext.read.format("jdbc")    reader.option("url", "jdbc:mysql://localhost:3306/spark")    reader.option("driver","com.mysql.jdbc.Driver")    reader.option("user","root")    reader.option("password","admin")    reader.option("dbtable", "score")    val score = reader.load()    score.show()    score.registerTempTable("score")    val result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name")    result.show()    /**      * 将数据写入到Mysql表中      */    val properties = new Properties()    properties.setProperty("user", "root")    properties.setProperty("password", "admin")    result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/spark", "result", properties)    sc.stop()  }

6、读取Hive中的数据加载成DataFrame

HiveContext是SQLContext的子类,连接Hive建议使用HiveContext。

java(spark 1.6)

public static void main(String[] args) {        SparkConf conf = new SparkConf();        conf.setAppName("hive");        JavaSparkContext sc = new JavaSparkContext(conf);        //HiveContext是SQLContext的子类。        HiveContext hiveContext = new HiveContext(sc);        hiveContext.sql("USE spark");        hiveContext.sql("DROP TABLE IF EXISTS student_infos");        //在hive中创建student_infos表        hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by '\t' ");        hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos");        hiveContext.sql("DROP TABLE IF EXISTS student_scores");        hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by '\t'");        hiveContext.sql("LOAD DATA LOCAL INPATH '/root/test/student_scores INTO TABLE student_scores");        /**         * 查询表生成DataFrame         */        Dataset goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score FROM student_infos si JOIN student_scores ss ON si.name=ss.name WHERE ss.score>=80");        hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");        goodStudentsDF.registerTempTable("goodstudent");        Dataset result = hiveContext.sql("select * from goodstudent");        result.show();        result.printSchema();        /**         * 将结果保存到hive表 good_student_infos         */        goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");        Row[] goodStudentRows = hiveContext.table("good_student_infos").collect();        for(Row goodStudentRow : goodStudentRows) {            System.out.println(goodStudentRow);        }        sc.stop();    }

scala(spark 1.6)

  def main(args: Array[String]): Unit = {    val conf = new SparkConf()    conf.setAppName("HiveSource")    val sc = new SparkContext(conf)    /**      * HiveContext是SQLContext的子类。      */    val hiveContext = new HiveContext(sc)    hiveContext.sql("use spark")    hiveContext.sql("drop table if exists student_infos")    hiveContext.sql("create table if not exists student_infos (name string,age int) row format  delimited fields terminated by '\t'")    hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos")    hiveContext.sql("drop table if exists student_scores")    hiveContext.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by '\t'")    hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores")    val df = hiveContext.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")    hiveContext.sql("drop table if exists good_student_infos")    /**      * 将结果写入到hive表中      */    df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")    sc.stop()  }

7、自定义udf

scala(spark 1.6)

 def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local").setAppName("spark13")    val spark = SparkSession.builder().config(conf).getOrCreate()    //rdd转df    val rdd: RDD[String] = spark.sparkContext.parallelize(Array("zhangsan","wangwu","masi"))    val rowRDD: RDD[Row] = rdd.map(RowFactory.create(_))    val schema = DataTypes.createStructType(Array(StructField("name",StringType,true)))    val df: DataFrame = spark.sqlContext.createDataFrame(rowRDD,schema)    df.show(50)        df.printSchema()    df.createOrReplaceTempView("test")    //自定义udf函数 函数名为StrLen,参数为String、Int String有问题,网上说需要java.lang.String类型 //    spark.sqlContext.udf.register("StrLen",(s:String,i:Int)=>{s.length+i})//    spark.sqlContext.udf.register("StrLen",(i:Int)=>{i})//    spark.sql("select name ,StrLen(name,10) as length from test").show(20)    spark.sql("select name ,StrLen(10) as length from test").show(20)  }

java (spark 1.6)

  public static void main(String[] args) {        SparkConf conf = new SparkConf();        conf.setMaster("local");        conf.setAppName("udf");        JavaSparkContext sc = new JavaSparkContext(conf);        SQLContext sqlContext = new SQLContext(sc);        JavaRDD parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu"));        JavaRDD rowRDD = parallelize.map(new Function() {            private static final long serialVersionUID = 1L;            @Override            public Row call(String s) throws Exception {                return RowFactory.create(s);            }        });        List fields = new ArrayList();        fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));        StructType schema = DataTypes.createStructType(fields);        Dataset df = sqlContext.createDataFrame(rowRDD, schema);        df.registerTempTable("user");        /**         * 根据UDF函数参数的个数来决定是实现哪一个UDF  UDF1,UDF2。。。。UDF1xxx         */        sqlContext.udf().register("StrLen", new UDF1() {            private static final long serialVersionUID = 1L;            @Override            public Integer call(String t1) throws Exception {                return t1.length();            }        }, DataTypes.IntegerType);        sqlContext.sql("select name ,StrLen(name) as length from user").show();        sc.stop();    }

感谢各位的阅读!关于"sparl sql有哪些"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

0