千家信息网

ORC文件读写工具类和Flink输出ORC格式文件的方法

发表于:2024-10-20 作者:千家信息网编辑
千家信息网最后更新 2024年10月20日,本篇内容主要讲解"ORC文件读写工具类和Flink输出ORC格式文件的方法",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"ORC文件读写工具类和Flink输
千家信息网最后更新 2024年10月20日ORC文件读写工具类和Flink输出ORC格式文件的方法

本篇内容主要讲解"ORC文件读写工具类和Flink输出ORC格式文件的方法",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"ORC文件读写工具类和Flink输出ORC格式文件的方法"吧!

一.ORC文件:

压缩

压缩比例在1:7到1:10之间,3份副本的话会节省接近10倍空间

调查数据周末要给出

数据压缩后要注意负载均衡问题,可以尝试reblance

导出

hive的orc文件使用sqoop导出到mysql使用hcatalog直接增加一些配置参数即可

查看

以json方式查看orc文件

hive --orcfiledump -j -p /user/hive/warehouse/dim.db/dim_province/000000_0

下载

以KV形式查看orc文件

hive --orcfiledump -d /user/hive/warehouse/dim.db/dim_province/000000_0 > myfile.txt

orc读取会查找字段在min和max中的值,不包含则跳过,所以速度会快

二,orc读写工具类

注意事项: 在windows读写时,请务必保证classpath ,path中不要有hadoop的环境变量! 如果有,请先删除,并且重启IDE

2.1 读:

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;import org.apache.orc.OrcFile;import org.apache.orc.Reader;import org.apache.orc.RecordReader;import org.apache.orc.TypeDescription;import java.io.IOException;public class CoreReader {  public static void main(Configuration conf, String[] args) throws IOException {    // Get the information from the file footer    Reader reader = OrcFile.createReader(new Path("my-file.orc"),                                         OrcFile.readerOptions(conf));    System.out.println("File schema: " + reader.getSchema());    System.out.println("Row count: " + reader.getNumberOfRows());    // Pick the schema we want to read using schema evolution    TypeDescription readSchema =        TypeDescription.fromString("struct");    // Read the row data    VectorizedRowBatch batch = readSchema.createRowBatch();    RecordReader rowIterator = reader.rows(reader.options()                                             .schema(readSchema));    LongColumnVector z = (LongColumnVector) batch.cols[0];    BytesColumnVector y = (BytesColumnVector) batch.cols[1];    LongColumnVector x = (LongColumnVector) batch.cols[2];    while (rowIterator.nextBatch(batch)) {      for(int row=0; row < batch.size; ++row) {        int zRow = z.isRepeating ? 0: row;        int xRow = x.isRepeating ? 0: row;        System.out.println("z: " +            (z.noNulls || !z.isNull[zRow] ? z.vector[zRow] : null));        System.out.println("y: " + y.toString(row));        System.out.println("x: " +            (x.noNulls || !x.isNull[xRow] ? x.vector[xRow] : null));      }    }    rowIterator.close();  }  public static void main(String[] args) throws IOException {    main(new Configuration(), args);  }}

2.2,写:

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;import org.apache.orc.OrcFile;import org.apache.orc.TypeDescription;import org.apache.orc.Writer;import java.io.IOException;import java.nio.charset.StandardCharsets;public class CoreWriter {  public static void main(Configuration conf, String[] args) throws IOException {    TypeDescription schema =      TypeDescription.fromString("struct");    Writer writer = OrcFile.createWriter(new Path("my-file.orc"),                                         OrcFile.writerOptions(conf)                                          .setSchema(schema));    VectorizedRowBatch batch = schema.createRowBatch();    LongColumnVector x = (LongColumnVector) batch.cols[0];    BytesColumnVector y = (BytesColumnVector) batch.cols[1];for(int r=0; r < 10000; ++r) {      int row = batch.size++;      x.vector[row] = r;      byte[] buffer = ("Last-" + (r * 3)).getBytes(StandardCharsets.UTF_8);      y.setRef(row, buffer, 0, buffer.length);      // If the batch is full, write it out and start over.      if (batch.size == batch.getMaxSize()) {        writer.addRowBatch(batch);        batch.reset();      }    }if (batch.size != 0) {      writer.addRowBatch(batch);    }    writer.close();  }  public static void main(String[] args) throws IOException {main(new Configuration(), args);  }}

2.3 Flink Sink ORC文件示例:(基于flink1.12.3版本)

import org.apache.flink.core.fs.Path;import org.apache.flink.orc.OrcSplitReaderUtil;import org.apache.flink.orc.vector.RowDataVectorizer;import org.apache.flink.orc.writer.OrcBulkWriterFactory;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.table.data.GenericRowData;import org.apache.flink.table.data.RowData;import org.apache.flink.table.types.logical.DoubleType;import org.apache.flink.table.types.logical.IntType;import org.apache.flink.table.types.logical.LogicalType;import org.apache.flink.table.types.logical.RowType;import org.apache.flink.table.types.logical.VarCharType;import org.apache.hadoop.conf.Configuration;import org.apache.orc.TypeDescription;import java.util.Properties;public class StreamingWriteFileOrc {    public static void main(String[] args) throws Exception{        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.enableCheckpointing(10000);        env.setParallelism(1);        DataStream dataStream = env.addSource(                new MySource());        //写入orc格式的属性        final Properties writerProps = new Properties();        writerProps.setProperty("orc.compress", "LZ4");        //定义类型和字段名        LogicalType[] orcTypes = new LogicalType[]{                new IntType(), new DoubleType(), new VarCharType()};        String[] fields = new String[]{"a", "b", "c"};        TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(RowType.of(                orcTypes,                fields));        //构造工厂类OrcBulkWriterFactory        final OrcBulkWriterFactory factory = new OrcBulkWriterFactory<>(                new RowDataVectorizer(typeDescription.toString(), orcTypes),                writerProps,                new Configuration());        StreamingFileSink orcSink = StreamingFileSink                .forBulkFormat(new Path("file:///tmp/aaaa"), factory)                .build();        dataStream.addSink(orcSink);        env.execute();    }    public static class MySource implements SourceFunction{        @Override        public void run(SourceContext sourceContext) throws Exception{            while (true){                GenericRowData rowData = new GenericRowData(3);                rowData.setField(0, (int) (Math.random() * 100));                rowData.setField(1, Math.random() * 100);                rowData.setField(2, org.apache.flink.table.data.StringData.fromString(String.valueOf(Math.random() * 100)));                sourceContext.collect(rowData);                Thread.sleep(1);            }        }        @Override        public void cancel(){        }    }}

2.4 POM依赖

            UTF-8        1.8        UTF-8        UTF-8        1.8        UTF-8        1.8        2.11        2.11        1.12.3        1.2.0        1.7.21        1.3.1        compile                                 commons-cli            commons-cli            1.4                            commons-codec            commons-codec            1.15                                    junit            junit            4.11            test                            org.apache.hbase            hbase-client            ${hbase.version}                                                org.apache.hadoop                    hadoop-yarn-common                                                    org.apache.hadoop                    hadoop-yarn-api                                                    hadoop-mapreduce-client-core                    org.apache.hadoop                                                    hadoop-auth                    org.apache.hadoop                                                    hadoop-common                    org.apache.hadoop                                                        commons-lang            commons-lang            2.6                            org.apache.commons            commons-lang3            3.3.2                            mysql            mysql-connector-java            5.1.47                            com.alibaba            fastjson            1.2.28                            org.apache.flink            flink-java            ${flink.cluster.version}            ${scope.value}                            org.apache.flink            flink-table            ${flink.cluster.version}            pom            ${scope.value}                            org.apache.flink            flink-table-api-scala-bridge_2.11            ${flink.cluster.version}            ${scope.value}                            org.apache.flink            flink-table-api-java-bridge_2.11            ${flink.cluster.version}            ${scope.value}                            org.apache.flink            flink-connector-filesystem_2.11            1.11.3                            org.apache.flink            flink-connector-filesystem_${scala.version}            1.11.3                                    org.apache.flink            flink-orc_2.11            1.12.3            ${scope.value}                            org.apache.flink            flink-ml_${scala.version}            1.8.1            ${scope.value}                                    org.apache.flink            flink-table-planner-blink_2.11            ${flink.cluster.version}            ${scope.value}                                    org.apache.flink            flink-table-common            ${flink.cluster.version}            ${scope.value}                                    org.apache.flink            flink-streaming-java_${scala.version}            1.12.3            ${scope.value}                            org.apache.flink            flink-streaming-scala_${scala.version}            ${flink.cluster.version}                                                commons-lang3                    org.apache.commons                                                    commons-cli                    commons-cli                                        ${scope.value}                            org.apache.flink            flink-connector-kafka_${scala.version}            ${flink.cluster.version}                                                log4j                    log4j                                                    org.slf4j                    slf4j-log4j12                                                        org.apache.hadoop            hadoop-common            2.7.3            ${scope.value}                            org.apache.hadoop            hadoop-hdfs            2.7.3            ${scope.value}                                                xml-apis                    xml-apis                                                        org.apache.flink            flink-parquet_${scala.version}            ${flink.cluster.version}                            org.apache.flink            flink-avro            ${flink.cluster.version}                                    org.slf4j            slf4j-api            ${slf4j.version}                            ch.qos.logback            logback-core            ${logback.version}                            ch.qos.logback            logback-classic            ${logback.version}                                    redis.clients            jedis            3.0.0                            org.apache.commons            commons-pool2            2.5.0                                    com.alibaba            druid            1.0.11                            org.apache.flink            flink-clients_2.11            ${flink.cluster.version}                            org.apache.hive            hive-jdbc            1.2.1                                    org.apache.hadoop            hadoop-client            2.7.3            

到此,相信大家对"ORC文件读写工具类和Flink输出ORC格式文件的方法"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0