Hadoop中如何分区
发表于:2024-11-23 作者:千家信息网编辑
千家信息网最后更新 2024年11月23日,小编给大家分享一下Hadoop中如何分区,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!package partition;
千家信息网最后更新 2024年11月23日Hadoop中如何分区
小编给大家分享一下Hadoop中如何分区,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
package partition;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class KpiApp { public static final String INPUT_PATH = "hdfs://hadoop:9000/files/HTTP_20130313143750.dat"; public static final String OUTPUT_PATH = "hdfs://hadoop:9000/files/format"; public static void main(String[] args)throws Exception { Configuration conf = new Configuration(); existsFile(conf); Job job = new Job(conf, KpiApp.class.getName()); //打成Jar在Linux运行 job.setJarByClass(KpiApp.class); //1.1 FileInputFormat.setInputPaths(job, INPUT_PATH); job.setInputFormatClass(TextInputFormat.class); //1.2 job.setMapperClass(MyMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(KpiWritable.class); //1.3 自定义分区 job.setPartitionerClass(KpiPartition.class); job.setNumReduceTasks(2); //1.4 排序分组 //1.5 聚合 //2.1 //2.2 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(KpiWritable.class); //2.3 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); job.setOutputFormatClass(TextOutputFormat.class); job.waitForCompletion(true); } private static void existsFile(Configuration conf) throws IOException, URISyntaxException { FileSystem fs = FileSystem.get(new URI(OUTPUT_PATH),conf); if(fs.exists(new Path(OUTPUT_PATH))){ fs.delete(new Path(OUTPUT_PATH), true); } } static class MyMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String string = value.toString(); String[] split = string.split("\t"); String phone = split[1]; Text key2 = new Text(); key2.set(phone); KpiWritable v2= new KpiWritable(); v2.set(split[6],split[7],split[8],split[9]); context.write(key2, v2); } } static class MyReducer extends Reducer { @Override protected void reduce(Text key2, Iterable values,Context context) throws IOException, InterruptedException { long upPackNum = 0L; long downPackNum = 0L; long upPayLoad = 0L; long downPayLoad = 0L; for(KpiWritable writable : values){ upPackNum += writable.upPackNum; downPackNum += writable.downPackNum; upPayLoad += writable.upPayLoad; downPayLoad += writable.downPayLoad; } KpiWritable value3 = new KpiWritable(); value3.set(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad)); context.write(key2, value3); } }}class KpiWritable implements Writable{ long upPackNum; long downPackNum; long upPayLoad; long downPayLoad; @Override public void write(DataOutput out) throws IOException { out.writeLong(this.upPackNum); out.writeLong(this.downPackNum); out.writeLong(this.upPayLoad); out.writeLong(this.downPayLoad); } public void set(String string, String string2, String string3, String string4) { this.upPackNum = Long.parseLong(string); this.downPackNum = Long.parseLong(string2); this.upPayLoad = Long.parseLong(string3); this.downPayLoad = Long.parseLong(string4); } @Override public void readFields(DataInput in) throws IOException { this.upPackNum = in.readLong(); this.downPackNum = in.readLong(); this.upPayLoad = in.readLong(); this.downPayLoad = in.readLong(); } @Override public String toString() { return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad; }}class KpiPartition extends Partitioner { @Override public int getPartition(Text key, KpiWritable value, int numPartitions) { String string = key.toString(); return string.length()==11?0:1; }}
Paritioner是Hashpartitioner的基类,如果需要定制Partitioner也需要继承该类。
HashPartitioner是MapReduce的默认Partitioner。
以上是"Hadoop中如何分区"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
篇文章
内容
不怎么
大部分
更多
知识
行业
资讯
资讯频道
频道
分组
参考
学习
帮助
排序
运行
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
没事找事网络技术有限公司
湖南省狼人网络技术有限公司
金铲铲之战可以转服务器么
2017网络技术
俄服失落的方舟哪个服务器好
vc进行软件开发的优点
美国中机网络安全公司
web服务器 作用
远江盛邦网络安全中国电建
mysql数据库主键修改
永恒之塔国服怀旧服的服务器在哪
apk软件开发工具下载
职工培训论文 网络技术
重庆数据库培训费用
菏泽软件开发服务
网络安全法为配套的法
网络安全事件六级
数据库system初始密码
领一年服务器
如何做好软件开发计划
网络安全工程师证书照片
ip地址怎么添加共享服务器
音乐库数据库系统
安徽益美丽家网络技术有限公司
华为手机云服务器下载教程
戴尔服务器r410的主板型号
数据库陈明忠
mac用于软件开发6
数据库去重复数据库
app软件开发工程师有哪些专业