千家信息网

8.spark core之读写数据

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,  spark支持多种数据源,从总体来分分为两大部分:文件系统和数据库。文件系统  文件系统主要有本地文件系统、Amazon S3、HDFS等。  文件系统中存储的文件有多种存储格式。spark支持的
千家信息网最后更新 2025年01月24日8.spark core之读写数据

  spark支持多种数据源,从总体来分分为两大部分:文件系统和数据库。

文件系统

  文件系统主要有本地文件系统、Amazon S3、HDFS等。

  文件系统中存储的文件有多种存储格式。spark支持的一些常见格式有:

格式名称结构化说明
文件文件普通文件文件,每行一条记录
JSON半结构化常见的基于文本的半结构化数据
CSV常见的基于文本的格式,在电子表格应用中使用
SequenceFiles一种用于键值对数据的常见Hadoop文件格式

文本文件

  • 读取

    • 读取单个文件,参数为文件全路径,输入的每一行都会成为RDD的一个元素。

      • python
      input = sc.textFile("file://opt/module/spark/README.md")
      • scala
      val input = sc.textFile("file://opt/module/spark/README.md")
      • java
      JavaRDD input = sc.textFile("file://opt/module/spark/README.md")
    • 读取多个文件时,可以使用textFile将参数改为目录或以逗号文件的多个文件名即可。如果是小文件,也可以使用wholeTextFiles读取为一个Pair RDD(键是文件名,值是文件内容)。
    val input = sc.wholeTextFiles("file://opt/module/spark/datas")val result = input.mapValues{    y => {        val nums = y.split(" ").map(x => x.toDouble)        nums.sum / nums.size.toDouble    }}
  • 写入

  输出文本文件时,可使用saveAsTextFile()方法接收一个目录,将RDD中的内容输出到目录中的多个文件中。

```result.saveAsTextFile(outputFile)```

JSON

  • 读取

    • 将数据作为文本文件读取,然后使用JSON解析器对数据进行解析。
    • python使用内置库读取JSON
    import json...input = sc.textFile("file.json")data = input.map(lambda x: json.loads(x))
    • scala使用Jackson读取JSON
    import com.fasterxml.jackson.databind.ObjectMapperimport com.fasterxml.jackson.module.scala.DefaultScalaModule...case class Person(name: String, lovesPandas: Boolean)...val input = sc.textFile("file.json")val mapper = new ObjectMapper()mapper.registerModule(DefaultScalaModule)val result = input.flatMap(record => {    try {        Some(mapper.readValue(record, classOf[Person]))    } catch {        case e: Exception => None    }})
    • java使用Jackson读取JSON
    class ParseJson implements FlatMapFunction, Person> {    public Iterable call(Iterator lines) throws Exception {        ArrayList people = new ArrayList();        ObjectMapper mapper = new ObjectMapper();        while(lines.hasNext()) {            String line = lines.next();            try {                people.add(mapper.readValue(line, Person.class));                } catch(Exception e) {                //跳过失败的数据            }        }        return people;    }}JavaRDD input = sc.textFile("file.json");JavaRDD result = input.mapPartitions(new ParseJson());
  • 写入

    • 使用JSON解析器将结构化的RDD转为字符串RDD,然后使用文本文件API输出。
    • python
    (data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile)
    • scala
    result.filter(p => p.lovesPandas).map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile)
    • java
    class WriteJson implements FlatMapFunction, String> {    public Iterable call(Iterator people) throws Exception {        ArrayList text = new ArrayList();        ObjectMapper mapper = new ObjectMapper();        while(people.hasNext()) {            Person person = people.next();            text.add(mapper.writeValueAsString(person));        }        return text;    }}JavaRDD result = input.mapPartitions(new ParseJson()).filter(new LikesPandas());JavaRDD formatted = result.mapPartitions(new WriteJson());formatted.saveAsTextFile(outfile);

CSV与TSV

  CSV与TSV文件每行都有固定的字段,字段之间使用分隔符(CSV使用逗号;tsv使用制表符)分隔。

  • 读取

    • 将csv或tsv文件当作普通文本文件读取,然后使用响应的解析器进行解析,同json处理方式。

    • python使用内置库读取csv

      • 文件中所有字段没有包含换行符
      import csvimport StringIO...def loadRecord(line):    input = StringIO.StringIO(line)    reader = csv.DictReader(input, fieldnames=["name","favouriteAnimal"])    return reader.next()"""读取每行记录"""    input = sc.textFile(inputFile).map(loadRecord)
      • 文件中的字段包含换行符
      def loadRecords(fileNameContents):    input = StringIO.StringIO(fileNameContents[1])    reader = csv.DictReader(input, fieldnames=["name","favoriteAnimal"])    return reader"""读取整个文件"""fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)
    • scala使用opencsv库读取csv

      • 文件中所有字段没有包含换行符
      import Java.io.StringReaderimport au.com.bytecode.opencsv.CSVReader...val input = sc.textFile(inputFile)val result = input.map{    line => {        val reader = new CSVReader(new StringReader(line))        reader.readNext()    }}
      • 文件中的字段包含换行符
      case class Person(name: String, favoriteAnimal: String)val input = sc.wholeTextFiles(inputFile)val result = input.flatMap(    case(_, txt) => {        val reader = new CSVReader(new StringReader(txt))        reader.readAll().map(x => Person(x(0), x(1)))    }
    • java使用opencsv库读取csv

      • 文件中所有字段没有包含换行符
      import Java.io.StringReaderimport au.com.bytecode.opencsv.CSVReader...public static class ParseLine implements Function {    public String[] call(String line) throws Exception {        CSVReader reader = new CSVReader(new StringReader(line));        return reader.readNext();    }}JavaPairRDD csvData = sc.textFile(inputFile).map(new ParseLine());
      • 文件中的字段包含换行符
      public static class ParseLine implements FlatMapFunction, String[]> {    public Iterable call(Tuple2 file) throws Exception {        CSVReader reader = new CSVReader(new StringReader(file._2);        return reader.readAll();    }}JavaRDD keyedRDD = sc.wholeTextFiles(inputFile).flatMap(new ParseLine());
  • 写入

    • csv或tsv文件输出时,将个字段转为指定顺序的数组,然后采用普通文本文件的方式进行输出。
    • python
    def writeRecords(records):    output = StringIO.StringIO()    writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])    for record in records:        writer.writerow(record)    return [output.getValue()]pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
    • scala
    pandasLovers.map(person => List(person.name, person.favoriteAnimal).toArray).mapPartitions{    people => {    val stringWriter = new StringWriter()    val csvWriter = new CSVWriter(stringWriter)    csvWriter.writeAll(people.toList)    Iterator(stringWriter.toString)        }}.saveAsTextFile(outFile)

SequenceFile

  SequenceFile是键值对形式的常用Hadoop数据格式。由于Hadoop使用一套自定义的序列化框架,因此SequenceFile的键值对类型需实现Hadoop的Writable接口。

  • 读取

    • python
    data = sc.sequenceFile(inFile, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")
    • scala
    val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]).map{case (x, y) => (x.toString, y.get())}    
    • java
    public static class ConvertToNativeTypes implements PairFunction, String, Integer> {    public Tuple2 call(Tuple2 record) {        return new Tuple2(record._1.toString(), record._2.get());    }}JavaPairRDD result = sc.sequenceFile(fileName, Text.class, IntWritable.class).mapToPair(new ConvertToNativeTypes());
  • 写入

    • python
    data = sc.parallelize([("Panda", 3), ("Kay", 6), ("Snail", 2)])data.saveAsSequeceFile(outputFile)
    • scala
    val data = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2)))data.saveAsSequenceFile(outputFile)
    • java(java中没有saveAsSequenceFile方法,用自定义hadoop格式的方式实现)
    public static class ConvertToWritableTypes implements PairFunction, Text, IntWritable> {    public Tuple2 call(Tuple2 record) {        return new Tuple2(new Text(record._1), new IntWritable(record._2));    }}JavaPairRDD result = sc.parallelizePairs(input).mapToPair(new ConvertToNativeTypes());result.saveAsHadoopFile(fileName, Text.class, IntWritable.class, SequenceFileOutputFormat.class);

数据库

  数据库主要分为关系型数据库(MySQL、PostgreSQL等)和非关系型数据库(HBase、ElasticSearch等)。

JDBC数据库连接

  spark使用JDBC访问关系型数据库(MySQL、PostgreSQL等),只需要构建一个org.apache.spark.rdd.JdbcRDD即可。

def createConnection() = {    Class.forName("com.mysql.jdbc.Driver").newInstance()    DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "root")}def extractValues(r: ResultSet) = {    (r.getInt(1), r.getString(2))}val data = new JdbcRDD(sc, createConnection,                 "SELECT * FROM panda WHERE id >= ? AND id <= ?"),                lowerBound = 1, upperBound = 3,                 numPartitions = 2, mapRow = extractValues)println(data.collect().toList)

HBase

  spark通过Hadoop输入格式(org.apache.hadoop.hbase.mapreduce.TableInputFormat)访问HBase。这种格式返回键值对数据,键类型为org.apache.hadoop.hbase.io.ImmutableBytesWritable,值类型为org.apache.hadoop.hbase.client.Result。

import org.apache.hadoop.hbase.HBaseConfigurationimport org.apache.hadoop.hbase.client.Resultimport org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.TableInputFormatval conf = HBaseConfiguration.create()conf.set(TableInputFormat.INPUT_TABLE, "tablename")val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], ClassOf[Result])

ElasticSearch

  spark使用ElasticSearch-Hadoop连接器从ElasticSearch中读写数据。ElasticSearch连接器依赖于SparkContext设置的配置项。ElasticSearch连接器也没有用到Spark封装的类型,而使用saveAsHadoopDataSet。

  • 读取
def mapWritableToInput(in: MapWritable): Map[String, String] = {    in.map{case (k, v) => (k.toString, v.toString)}.toMap}val jobConf = new JobConf(sc.hadoopConfiguration)jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args[1])jobConf.set(ConfigurationOptions.ES_NODES, args[2])val currentTweets = sc.hadoopRDD(jobConf, classOf[EsInputFormat[Object, MapWritable]], classOf[Object], ClassOf[MapWritable])val tweets = currentTweets.map{ case (key, value) => mapWritableToInput(value) }
  • 写入
val jobConf = new JobConf(sc.hadoopConfiguration)jobConf.set("mapred.output.format.class", "org.elasticsearch.hadoop.mr.EsOutFormat")jobConf.setOutputCommitter(classOf[FileOutputCommitter])jobConf.set(ConfigurationOptions.ES_RESOURCE_WRITE, "twitter/tweets")jobConf.set(ConfigurationOptions.ES_NODES, "localhost")FileOutputFormat.setOutputPath(jobConf, new Path("-"))output.saveAsHadoopDataset(jobConf)

忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。

0