千家信息网

Spark Data Sources怎么使用

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,本篇内容主要讲解"Spark Data Sources怎么使用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Spark Data Sources怎么使用"
千家信息网最后更新 2025年02月03日Spark Data Sources怎么使用

本篇内容主要讲解"Spark Data Sources怎么使用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Spark Data Sources怎么使用"吧!

一:Data Sources(数据源):

1.1 了解数据源。

Spark SQL 支持对各种数据源通过DataFrame接口操作。DataFrame 可以作为正常 的RDDs进行操作,也可以注册为一个临时表。
注册DataFrame为一个表允许您在其数据运行 SQL 查询。本节介绍用于加载和保存使用 Spark数据源的数据的一般方法,然后再进入到可用的内置数据源的特定选项。

1.2 Generic Load/Save Functions(通用加载/保存功能)。

最简单的形式,默认的数据源 (parquet除非否则配置由 spark.sql.sources.default) 将用于所有操作。

eg:第一种读取方式:通过 parquetFile("xxx") 来读取

首先把spark-1.6.1-bin-hadoop2.6\examples\src\main\resources下的users.parquet上传到HDFS上。

public class SparkSqlDemo4 {        private static String appName = "Test Spark RDD";        private static String master = "local";        public static void main(String[] args) {                SparkConf conf = new SparkConf();                conf.set("spark.testing.memory", "269522560000");                JavaSparkContext sc = new JavaSparkContext(master, appName, conf);                //创建了 sqlContext的上下文,注意,它是DataFrame的起点。                SQLContext sqlContext = new SQLContext(sc);                DataFrame df = sqlContext.read().load("hdfs://192.168.226.129:9000/txt/sparkshell/users.parquet");                                df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");                //指定保存模式                  //df.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");                  //第一种读取方式                  DataFrame parquetFile = sqlContext.parquetFile("namesAndFavColors.parquet");                parquetFile.registerTempTable("users");                  DataFrame df1 = sqlContext.sql("SELECT name,favorite_color FROM users ");                                  df1.show();                                List listString = df1.javaRDD().map(new Function() {                          private static final long serialVersionUID = 1L;                        public String call(Row row) {                                  return "Name: " + row.getString(0) + " ,FavoriteColor: " + row.getString(1);                          }                  }).collect();                  for (String string : listString) {                        System.out.println(  string );                }        }}

输出结果如下:

+------+--------------+|  name|favorite_color|+------+--------------+|Alyssa|          null||   Ben|           red|+------+--------------+Name: Alyssa ,FavoriteColor: nullName: Ben ,FavoriteColor: red

1.3 Manually Specifying Options(手动指定选项):

你可以也手动指定的数据源,将与您想要将传递给数据源的任何额外选项一起使用。

数据源由其完全限定名称 (即 org.apache.spark.sql.parquet),

但对于内置来源您还可以使用 他们短名称(json, parquet, jdbc)。

DataFrames 任何类型可以转换为其他类型,使用此语法。

eg:第二种读取方式:通过 parquet("xxx") 来读取

public class SparkSqlDemo5 {        private static String appName = "Test Spark RDD";        private static String master = "local";        public static void main(String[] args) {                SparkConf conf = new SparkConf();                conf.set("spark.testing.memory", "269522560000");                JavaSparkContext sc = new JavaSparkContext(master, appName, conf);                //创建了 sqlContext的上下文,注意,它是DataFrame的起点。                SQLContext sqlContext = new SQLContext(sc);                DataFrame df = sqlContext.read().format("json").load("hdfs://192.168.226.129:9000/txt/sparkshell/people.json");                df.select("id", "name","sex","age").write().format("parquet").save("people.parquet");                DataFrame parquetFile = sqlContext.read().parquet("people.parquet");                                  parquetFile.registerTempTable("people");                  DataFrame df1 = sqlContext.sql("SELECT id,name,sex,age FROM people WHERE age >= 21 AND age <= 23");                  df1.show();                                df1.printSchema();                List listString = df1.javaRDD().map(new Function() {                          private static final long serialVersionUID = 1L;                        public String call(Row row) {                                  return "Id: " + row.getString(0) + ", Name: "+row.getString(1) + ",Sex" +row.getString(2)+ ", Age: " + row.getLong(3);                        }                }).collect();                  for (String string : listString) {                        System.out.println(  string );                }        }}

1.4 Run SQL on files directly(直接在文件上运行SQL)

您也可以查询该文件直接使用 SQL,并对其进行查询,而不是使用 API 读取文件加载到DataFrame。

public class SparkSqlDemo6 {        private static String appName = "Test Spark RDD";        private static String master = "local";        public static void main(String[] args) {                SparkConf conf = new SparkConf();                conf.set("spark.testing.memory", "269522560000");                JavaSparkContext sc = new JavaSparkContext(master, appName, conf);                //创建了 sqlContext的上下文,注意,它是DataFrame的起点。                SQLContext sqlContext = new SQLContext(sc);                // 注意 sql 语句 parquet 后面目录的符号 。                 DataFrame df = sqlContext.sql("SELECT * FROM parquet.`hdfs://192.168.226.129:9000/txt/sparkshell/users.parquet`");                                df.show();        }}

注意:sql语句中红色部分标记的符号。

"SELECT * FROM parquet.`hdfs://192.168.226.129:9000/txt/sparkshell/users.parquet`"

二:Save Modes(保存模式):

Save操作可以可选择性地接收一个SaveModel,如果数据已经存在了,指定如何处理已经存在的数据。意识到这些保存模式没有利用任何锁,也不是原子的,这很重要。因此,如果有多个写入者试图往同一个地方写入,这是不安全的。此外,当执行一个Overwrite,在写入新的数据之前会将原来的数据进行删除。

eg:指定保存模式:
df.select("name","favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");

Scala/JavaAny LanguageMeaning
SaveMode.ErrorIfExists(default)"error"(default)当保存DataFrame到一个数据源,如果数据已经存在,将会引发异常。
SaveMode.Append"append"


当保存DataFrame到一个数据源,如果数据/表已经存在,DataFrame的内容预计将追加到现有数据后面。

SaveMode.Overwrite"overwrite"覆盖模式意味着当保存DataFrame到一个数据源,如果数据/表已存在,现有数据预计将覆盖原有的DataFrame内容。
SaveMode.Ignore"ignore"Ignore模式意味着当向数据源中保存一个DataFrame时,如果数据已经存在,save操作不会将DataFrame的内容进行保存,也不会修改已经存在的数据。这与SQL中的`CREATE TABLE IF NOT EXISTS`相似。

三:Parquet 文件

Parquet是一种列式存储格式的文件,被许多其他数据处理系统所支持。Spark SQL支持度对Parquet文件的读和写,自动保存原有数据的模式。

到此,相信大家对"Spark Data Sources怎么使用"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0