千家信息网

Hadoop2.6.0学习笔记(三)Hadoop序列化

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,鲁春利的工作笔记,谁说程序员不能有文艺范?序列化和反序列化在分布式数据处理中,主要应用于进程建通信和永久存储两个领域。序列化(serialization)就是结构化的数据转换为字节流以便在网络上传输或
千家信息网最后更新 2025年01月23日Hadoop2.6.0学习笔记(三)Hadoop序列化

鲁春利的工作笔记,谁说程序员不能有文艺范?


序列化和反序列化在分布式数据处理中,主要应用于进程建通信和永久存储两个领域。

序列化(serialization)就是结构化的数据转换为字节流以便在网络上传输或写到磁盘进行永久存储的过程;反序列化(deserialization)就是将字节流转换回结构化对象的逆过程。


Hadoop系统节点进程间通信采用RPC实现,Hadoop没有采用Java的序列化机制,而是定义了两个序列化相关的接口:Writable和Comparable,而这两个接口由抽象出了一个WritableComparable接口。


在Hadoop中,Writable接口定义了两个方法:

将数据写入二进制格式的DataOutput流;

从二进制格式读取数据的DataInput流;

package org.apache.hadoop.io;public interface Writable {  /**    * Serialize the fields of this object to out.   */  void write(DataOutput out) throws IOException;  /**    * Deserialize the fields of this object from in.     */  void readFields(DataInput in) throws IOException;}

Hadoop中Writable接口的结构为

 Writable     WritableComparable         IntWritable                 int(定长)         VintWritable                int(变长)         BooleanWritable             boolean         ByteWritable                byte(single byte)         ShortWritable               short         FloatWritable               float         DoubleWritable              double         LongWritable                long(定长)         VlongWirtable               long(变长)         Text 是针对UTF-8序列的Writable类,一般认为它等价于java.lang.String         BytesWritable               byte(byte sequence)   ArrayWritable                     数组   TwoDArrayWritable                 二维数组   MapWritable    implements Map       SortedMapWritable implements SortedMap


WritableComparable接口提供了类型比较的功能,而类型比较对MapReduce至关重要。

package org.apache.hadoop.io;public interface WritableComparable extends Writable, Comparable {}# WritableComparator类是一个通用实现。1. 提供对原始compare()方法的一个默认实现;2. 充当RawComparator实例的工厂(已注册Writable实现)package org.apache.hadoop.io;public class WritableComparator implements RawComparator, Configurable {      /** For backwards compatibility. **/      public static WritableComparator get(Class c) {          return get(c, null);      }              /** Get a comparator for a {@link WritableComparable} implementation. */      public static WritableComparator get(Class c, Configuration conf) {          // ......      }}


简单示例

package com.lucl.hadoop.hdfs;import java.io.ByteArrayOutputStream;import java.io.DataOutput;import java.io.DataOutputStream;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.WritableComparator;/** *  * @author lucl * */public class CustomizeComparator {    public static void main(String[] args) {        @SuppressWarnings("unchecked")        RawComparator comparator = WritableComparator.get(IntWritable.class);        IntWritable int003 = new IntWritable(300);        IntWritable int004 = new IntWritable(400);        // 利用内存字节数字实现writable到bytes的转化        ByteArrayOutputStream bytes001 = new ByteArrayOutputStream();        DataOutput out001 = new DataOutputStream(bytes001);        try {            int003.write(out001);        } catch (IOException e) {            e.printStackTrace();        }        byte [] b1 = bytes001.toByteArray();                // 利用内存字节数字实现int到bytes的转化        ByteArrayOutputStream bytes002 = new ByteArrayOutputStream();        DataOutput out002 = new DataOutputStream(bytes002);        try {            int004.write(out002);        } catch (IOException e) {            e.printStackTrace();        }        byte [] b2 = bytes002.toByteArray();                int comvalue = comparator.compare(b1, 0, b1.length, b2, 0, b2.length);        System.out.println("comvalue : " + comvalue);                // 利用原始值比较int数据        int value1 = int003.get();        int value2 = int004.get();        if (value1 > value2) {            System.out.println("value1 > value2");        } else {            System.out.println("value1 < value2");        }    }}


MapReduce程序

需要处理的数据(不同类型网站的访问量及流量)

[hadoop@nnode code]$ hdfs dfs -text /data/HTTP_SITE_FLOW.log视频网站        15      1527信息安全        20      3156站点统计        24      6960搜索引擎        28      3659站点统计        3       1938综合门户        15      1938搜索引擎        21      9531搜索引擎        63      11058

自定义序列化类

package com.lucl.hadoop.mapreduce.serialize;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class CustomizeWritable implements Writable {    private Long pv;    private Long flow;        public CustomizeWritable () {        // ......    }        public CustomizeWritable (String pv, String flow) {        this(Long.valueOf(pv), Long.valueOf(flow));    }        public CustomizeWritable (Long pv, Long flow) {        this.pv = pv;        this.flow = flow;    }    @Override    public void write(DataOutput out) throws IOException {        out.writeLong(pv);        out.writeLong(flow);    }    @Override    public void readFields(DataInput in) throws IOException {        pv = in.readLong();        flow = in.readLong();    }        public Long getPv() {        return pv;    }    public void setPv(Long pv) {        this.pv = pv;    }    public Long getFlow() {        return flow;    }    public void setFlow(Long flow) {        this.flow = flow;    }    @Override    public String toString() {        return this.pv + "\t" + this.flow;    }}

Mapper端代码

package com.lucl.hadoop.mapreduce.serialize;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class WritableMapper extends Mapper {    Text text = new Text();        @Override    protected void map(LongWritable key, Text value, Context context)            throws IOException, InterruptedException {        String [] splited = value.toString().split("\t");                text.set(splited[0]);        CustomizeWritable wr = new CustomizeWritable(splited[1], splited[2]);                context.write(text, wr);    }    }

Reducer端代码

package com.lucl.hadoop.mapreduce.serialize;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class WritableReducer extends Reducer {    @Override    protected void reduce(Text key, Iterable values, Context context)            throws IOException, InterruptedException {        Long pv = 0L;        Long flow = 0L;                for (CustomizeWritable customizeWritable : values) {            pv += customizeWritable.getPv();            flow += customizeWritable.getFlow();        }                CustomizeWritable wr = new CustomizeWritable(pv, flow);                context.write(key, wr);    }}

驱动类

package com.lucl.hadoop.mapreduce.serialize;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.apache.log4j.Logger;import com.lucl.hadoop.mapreduce.customize.MyWordCountApp;/** * @author lucl */public class CustomizeWritableMRApp extends Configured implements Tool {    private static final Logger logger = Logger.getLogger(MyWordCountApp.class);        public static void main(String[] args) {        try {            ToolRunner.run(new CustomizeWritableMRApp(), args);        } catch (Exception e) {            e.printStackTrace();        }    }    @Override    public int run(String[] args) throws Exception {        Configuration conf = new Configuration();        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();        if (otherArgs.length < 2) {            logger.info("Usage: wordcount  [...] ");            System.exit(2);        }                Job job = Job.getInstance(conf, this.getClass().getSimpleName());                job.setJarByClass(CustomizeWritableMRApp.class);                FileInputFormat.addInputPath(job, new Path(args[0]));                /**         * map         */        job.setMapperClass(WritableMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(CustomizeWritable.class);                /**         * reduce         */        job.setReducerClass(WritableReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(CustomizeWritable.class);                FileOutputFormat.setOutputPath(job, new Path(args[1]));                return job.waitForCompletion(true) ? 0 : 1;    }}

程序运行

[hadoop@nnode code]$ hadoop jar WRApp.jar /data/HTTP_SITE_FLOW.log /20151129140415/11/29 14:44:13 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:803215/11/29 14:44:15 INFO input.FileInputFormat: Total input paths to process : 115/11/29 14:44:15 INFO mapreduce.JobSubmitter: number of splits:115/11/29 14:44:15 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448763754600_000415/11/29 14:44:15 INFO impl.YarnClientImpl: Submitted application application_1448763754600_000415/11/29 14:44:15 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1448763754600_0004/15/11/29 14:44:15 INFO mapreduce.Job: Running job: job_1448763754600_000415/11/29 14:44:45 INFO mapreduce.Job: Job job_1448763754600_0004 running in uber mode : false15/11/29 14:44:45 INFO mapreduce.Job:  map 0% reduce 0/11/29 14:45:14 INFO mapreduce.Job:  map 100% reduce 0/11/29 14:45:34 INFO mapreduce.Job:  map 100% reduce 100/11/29 14:45:34 INFO mapreduce.Job: Job job_1448763754600_0004 completed successfully15/11/29 14:45:34 INFO mapreduce.Job: Counters: 49        File System Counters                FILE: Number of bytes read=254                FILE: Number of bytes written=216031                FILE: Number of read operations=0                FILE: Number of large read operations=0                FILE: Number of write operations=0                HDFS: Number of bytes read=277                HDFS: Number of bytes written=107                HDFS: Number of read operations=6                HDFS: Number of large read operations=0                HDFS: Number of write operations=2        Job Counters                 Launched map tasks=1                Launched reduce tasks=1                Data-local map tasks=1                Total time spent by all maps in occupied slots (ms)=27010                Total time spent by all reduces in occupied slots (ms)=16429                Total time spent by all map tasks (ms)=27010                Total time spent by all reduce tasks (ms)=16429                Total vcore-seconds taken by all map tasks=27010                Total vcore-seconds taken by all reduce tasks=16429                Total megabyte-seconds taken by all map tasks=27658240                Total megabyte-seconds taken by all reduce tasks=16823296        Map-Reduce Framework                Map input records=8                Map output records=8                Map output bytes=232                Map output materialized bytes=254                Input split bytes=103                Combine input records=0                Combine output records=0                Reduce input groups=5                Reduce shuffle bytes=254                Reduce input records=8                Reduce output records=5                Spilled Records=16                Shuffled Maps =1                Failed Shuffles=0                Merged Map outputs=1                GC time elapsed (ms)=167                CPU time spent (ms)=2320                Physical memory (bytes) snapshot=304205824                Virtual memory (bytes) snapshot=1695969280                Total committed heap usage (bytes)=136450048        Shuffle Errors                BAD_ID=0                CONNECTION=0                IO_ERROR=0                WRONG_LENGTH=0                WRONG_MAP=0                WRONG_REDUCE=0        File Input Format Counters                 Bytes Read=174        File Output Format Counters                 Bytes Written=107[hadoop@nnode code]$

查看结果

[hadoop@nnode code]$ hdfs dfs -ls /201511291404Found 2 items-rw-r--r--   2 hadoop hadoop          0 2015-11-29 14:45 /201511291404/_SUCCESS-rw-r--r--   2 hadoop hadoop        107 2015-11-29 14:45 /201511291404/part-r-00000[hadoop@nnode code]$ hdfs dfs -text /201511291404/part-r-00000信息安全        20      3156搜索引擎        112     24248站点统计        27      8898综合门户        15      1938视频网站        15      1527[hadoop@nnode code]$

注意:在第一次执行的时候报错如下

15/11/29 14:41:28 INFO mapreduce.Job:  map 100% reduce 0/11/29 14:42:04 INFO mapreduce.Job: Task Id : attempt_1448763754600_0003_r_000000_0, Status : FAILEDError: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.lucl.hadoop.mapreduce.serialize.CustomizeWritable.()        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131)        at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:66)        at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)        at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:146)        at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:121)        at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:302)        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:170)        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)        at java.security.AccessController.doPrivileged(Native Method)        at javax.security.auth.Subject.doAs(Subject.java:415)        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)Caused by: java.lang.NoSuchMethodException: com.lucl.hadoop.mapreduce.serialize.CustomizeWritable.()        at java.lang.Class.getConstructor0(Class.java:2892)        at java.lang.Class.getDeclaredConstructor(Class.java:2058)        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:125)        ... 13 more

在网上查询原因为"在自定义writable的时候需要注意,反射过程中需要调用无参构造,需要定义无参的构造方法。"。

0