Hadoop2.6.0学习笔记(三)Hadoop序列化
发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,鲁春利的工作笔记,谁说程序员不能有文艺范?序列化和反序列化在分布式数据处理中,主要应用于进程建通信和永久存储两个领域。序列化(serialization)就是结构化的数据转换为字节流以便在网络上传输或
千家信息网最后更新 2025年01月23日Hadoop2.6.0学习笔记(三)Hadoop序列化
鲁春利的工作笔记,谁说程序员不能有文艺范?
序列化和反序列化在分布式数据处理中,主要应用于进程建通信和永久存储两个领域。
序列化(serialization)就是结构化的数据转换为字节流以便在网络上传输或写到磁盘进行永久存储的过程;反序列化(deserialization)就是将字节流转换回结构化对象的逆过程。
Hadoop系统节点进程间通信采用RPC实现,Hadoop没有采用Java的序列化机制,而是定义了两个序列化相关的接口:Writable和Comparable,而这两个接口由抽象出了一个WritableComparable接口。
在Hadoop中,Writable接口定义了两个方法:
将数据写入二进制格式的DataOutput流;
从二进制格式读取数据的DataInput流;
package org.apache.hadoop.io;public interface Writable { /** * Serialize the fields of this object toout
. */ void write(DataOutput out) throws IOException; /** * Deserialize the fields of this object fromin
. */ void readFields(DataInput in) throws IOException;}
Hadoop中Writable接口的结构为
Writable WritableComparableIntWritable int(定长) VintWritable int(变长) BooleanWritable boolean ByteWritable byte(single byte) ShortWritable short FloatWritable float DoubleWritable double LongWritable long(定长) VlongWirtable long(变长) Text 是针对UTF-8序列的Writable类,一般认为它等价于java.lang.String BytesWritable byte(byte sequence) ArrayWritable 数组 TwoDArrayWritable 二维数组 MapWritable implements Map SortedMapWritable implements SortedMap
WritableComparable接口提供了类型比较的功能,而类型比较对MapReduce至关重要。
package org.apache.hadoop.io;public interface WritableComparableextends Writable, Comparable {}# WritableComparator类是一个通用实现。1. 提供对原始compare()方法的一个默认实现;2. 充当RawComparator实例的工厂(已注册Writable实现)package org.apache.hadoop.io;public class WritableComparator implements RawComparator, Configurable { /** For backwards compatibility. **/ public static WritableComparator get(Class extends WritableComparable> c) { return get(c, null); } /** Get a comparator for a {@link WritableComparable} implementation. */ public static WritableComparator get(Class extends WritableComparable> c, Configuration conf) { // ...... }}
简单示例
package com.lucl.hadoop.hdfs;import java.io.ByteArrayOutputStream;import java.io.DataOutput;import java.io.DataOutputStream;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.WritableComparator;/** * * @author lucl * */public class CustomizeComparator { public static void main(String[] args) { @SuppressWarnings("unchecked") RawComparatorcomparator = WritableComparator.get(IntWritable.class); IntWritable int003 = new IntWritable(300); IntWritable int004 = new IntWritable(400); // 利用内存字节数字实现writable到bytes的转化 ByteArrayOutputStream bytes001 = new ByteArrayOutputStream(); DataOutput out001 = new DataOutputStream(bytes001); try { int003.write(out001); } catch (IOException e) { e.printStackTrace(); } byte [] b1 = bytes001.toByteArray(); // 利用内存字节数字实现int到bytes的转化 ByteArrayOutputStream bytes002 = new ByteArrayOutputStream(); DataOutput out002 = new DataOutputStream(bytes002); try { int004.write(out002); } catch (IOException e) { e.printStackTrace(); } byte [] b2 = bytes002.toByteArray(); int comvalue = comparator.compare(b1, 0, b1.length, b2, 0, b2.length); System.out.println("comvalue : " + comvalue); // 利用原始值比较int数据 int value1 = int003.get(); int value2 = int004.get(); if (value1 > value2) { System.out.println("value1 > value2"); } else { System.out.println("value1 < value2"); } }}
MapReduce程序
需要处理的数据(不同类型网站的访问量及流量)
[hadoop@nnode code]$ hdfs dfs -text /data/HTTP_SITE_FLOW.log视频网站 15 1527信息安全 20 3156站点统计 24 6960搜索引擎 28 3659站点统计 3 1938综合门户 15 1938搜索引擎 21 9531搜索引擎 63 11058
自定义序列化类
package com.lucl.hadoop.mapreduce.serialize;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class CustomizeWritable implements Writable { private Long pv; private Long flow; public CustomizeWritable () { // ...... } public CustomizeWritable (String pv, String flow) { this(Long.valueOf(pv), Long.valueOf(flow)); } public CustomizeWritable (Long pv, Long flow) { this.pv = pv; this.flow = flow; } @Override public void write(DataOutput out) throws IOException { out.writeLong(pv); out.writeLong(flow); } @Override public void readFields(DataInput in) throws IOException { pv = in.readLong(); flow = in.readLong(); } public Long getPv() { return pv; } public void setPv(Long pv) { this.pv = pv; } public Long getFlow() { return flow; } public void setFlow(Long flow) { this.flow = flow; } @Override public String toString() { return this.pv + "\t" + this.flow; }}
Mapper端代码
package com.lucl.hadoop.mapreduce.serialize;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class WritableMapper extends Mapper{ Text text = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String [] splited = value.toString().split("\t"); text.set(splited[0]); CustomizeWritable wr = new CustomizeWritable(splited[1], splited[2]); context.write(text, wr); } }
Reducer端代码
package com.lucl.hadoop.mapreduce.serialize;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class WritableReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { Long pv = 0L; Long flow = 0L; for (CustomizeWritable customizeWritable : values) { pv += customizeWritable.getPv(); flow += customizeWritable.getFlow(); } CustomizeWritable wr = new CustomizeWritable(pv, flow); context.write(key, wr); }}
驱动类
package com.lucl.hadoop.mapreduce.serialize;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.apache.log4j.Logger;import com.lucl.hadoop.mapreduce.customize.MyWordCountApp;/** * @author lucl */public class CustomizeWritableMRApp extends Configured implements Tool { private static final Logger logger = Logger.getLogger(MyWordCountApp.class); public static void main(String[] args) { try { ToolRunner.run(new CustomizeWritableMRApp(), args); } catch (Exception e) { e.printStackTrace(); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { logger.info("Usage: wordcount[ ...] "); System.exit(2); } Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(CustomizeWritableMRApp.class); FileInputFormat.addInputPath(job, new Path(args[0])); /** * map */ job.setMapperClass(WritableMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CustomizeWritable.class); /** * reduce */ job.setReducerClass(WritableReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CustomizeWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; }}
程序运行
[hadoop@nnode code]$ hadoop jar WRApp.jar /data/HTTP_SITE_FLOW.log /20151129140415/11/29 14:44:13 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:803215/11/29 14:44:15 INFO input.FileInputFormat: Total input paths to process : 115/11/29 14:44:15 INFO mapreduce.JobSubmitter: number of splits:115/11/29 14:44:15 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448763754600_000415/11/29 14:44:15 INFO impl.YarnClientImpl: Submitted application application_1448763754600_000415/11/29 14:44:15 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1448763754600_0004/15/11/29 14:44:15 INFO mapreduce.Job: Running job: job_1448763754600_000415/11/29 14:44:45 INFO mapreduce.Job: Job job_1448763754600_0004 running in uber mode : false15/11/29 14:44:45 INFO mapreduce.Job: map 0% reduce 0/11/29 14:45:14 INFO mapreduce.Job: map 100% reduce 0/11/29 14:45:34 INFO mapreduce.Job: map 100% reduce 100/11/29 14:45:34 INFO mapreduce.Job: Job job_1448763754600_0004 completed successfully15/11/29 14:45:34 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=254 FILE: Number of bytes written=216031 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=277 HDFS: Number of bytes written=107 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=27010 Total time spent by all reduces in occupied slots (ms)=16429 Total time spent by all map tasks (ms)=27010 Total time spent by all reduce tasks (ms)=16429 Total vcore-seconds taken by all map tasks=27010 Total vcore-seconds taken by all reduce tasks=16429 Total megabyte-seconds taken by all map tasks=27658240 Total megabyte-seconds taken by all reduce tasks=16823296 Map-Reduce Framework Map input records=8 Map output records=8 Map output bytes=232 Map output materialized bytes=254 Input split bytes=103 Combine input records=0 Combine output records=0 Reduce input groups=5 Reduce shuffle bytes=254 Reduce input records=8 Reduce output records=5 Spilled Records=16 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=167 CPU time spent (ms)=2320 Physical memory (bytes) snapshot=304205824 Virtual memory (bytes) snapshot=1695969280 Total committed heap usage (bytes)=136450048 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=174 File Output Format Counters Bytes Written=107[hadoop@nnode code]$
查看结果
[hadoop@nnode code]$ hdfs dfs -ls /201511291404Found 2 items-rw-r--r-- 2 hadoop hadoop 0 2015-11-29 14:45 /201511291404/_SUCCESS-rw-r--r-- 2 hadoop hadoop 107 2015-11-29 14:45 /201511291404/part-r-00000[hadoop@nnode code]$ hdfs dfs -text /201511291404/part-r-00000信息安全 20 3156搜索引擎 112 24248站点统计 27 8898综合门户 15 1938视频网站 15 1527[hadoop@nnode code]$
注意:在第一次执行的时候报错如下
15/11/29 14:41:28 INFO mapreduce.Job: map 100% reduce 0/11/29 14:42:04 INFO mapreduce.Job: Task Id : attempt_1448763754600_0003_r_000000_0, Status : FAILEDError: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.lucl.hadoop.mapreduce.serialize.CustomizeWritable.() at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131) at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:66) at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42) at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:146) at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:121) at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:302) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:170) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)Caused by: java.lang.NoSuchMethodException: com.lucl.hadoop.mapreduce.serialize.CustomizeWritable. () at java.lang.Class.getConstructor0(Class.java:2892) at java.lang.Class.getDeclaredConstructor(Class.java:2058) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:125) ... 13 more
在网上查询原因为"在自定义writable的时候需要注意,反射过程中需要调用无参构造,需要定义无参的构造方法。"。
序列
接口
数据
两个
引擎
搜索引擎
搜索
方法
程序
站点
类型
结构
网站
过程
统计
原始
安全
代码
信息
内存
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
taojan 搭建服务器教程
免费心血管数据库
我的世界服务器服霸的房子
sql查询数据库表的统计信息
杭州东信网络技术限公司
服务器的外部工作环境
软件开发类的QC报告
网络安全法制意识
服务器断电了怎么通知
网络安全威慑
cdn网络安全加固培训
通力电梯软件开发怎么样
福州网络技术教育平台
表格怎么查找缺少数据库
数据库怎么分组排序
军团要塞2服务器英语
安卓软件开发行业工作内容
数据库r并s表示
联想服务器的管理页面
npi工程师和软件开发工资待遇
恢复数据库时对用户有没有要求
关注网络安全作文
数据库恢复与备份技术有哪些
浪潮信息服务器怎么样雪球
吉林省网络安全审查委员会
同方服务器武器装备名录
数据库bak文件还原
地下城与勇士跨6服务器bug
大带宽服务器的危害
库存数据库有什么用