Hadoop中如何分区
发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,小编给大家分享一下Hadoop中如何分区,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!package partition;
千家信息网最后更新 2025年02月04日Hadoop中如何分区
小编给大家分享一下Hadoop中如何分区,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
package partition;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class KpiApp { public static final String INPUT_PATH = "hdfs://hadoop:9000/files/HTTP_20130313143750.dat"; public static final String OUTPUT_PATH = "hdfs://hadoop:9000/files/format"; public static void main(String[] args)throws Exception { Configuration conf = new Configuration(); existsFile(conf); Job job = new Job(conf, KpiApp.class.getName()); //打成Jar在Linux运行 job.setJarByClass(KpiApp.class); //1.1 FileInputFormat.setInputPaths(job, INPUT_PATH); job.setInputFormatClass(TextInputFormat.class); //1.2 job.setMapperClass(MyMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(KpiWritable.class); //1.3 自定义分区 job.setPartitionerClass(KpiPartition.class); job.setNumReduceTasks(2); //1.4 排序分组 //1.5 聚合 //2.1 //2.2 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(KpiWritable.class); //2.3 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); job.setOutputFormatClass(TextOutputFormat.class); job.waitForCompletion(true); } private static void existsFile(Configuration conf) throws IOException, URISyntaxException { FileSystem fs = FileSystem.get(new URI(OUTPUT_PATH),conf); if(fs.exists(new Path(OUTPUT_PATH))){ fs.delete(new Path(OUTPUT_PATH), true); } } static class MyMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String string = value.toString(); String[] split = string.split("\t"); String phone = split[1]; Text key2 = new Text(); key2.set(phone); KpiWritable v2= new KpiWritable(); v2.set(split[6],split[7],split[8],split[9]); context.write(key2, v2); } } static class MyReducer extends Reducer { @Override protected void reduce(Text key2, Iterable values,Context context) throws IOException, InterruptedException { long upPackNum = 0L; long downPackNum = 0L; long upPayLoad = 0L; long downPayLoad = 0L; for(KpiWritable writable : values){ upPackNum += writable.upPackNum; downPackNum += writable.downPackNum; upPayLoad += writable.upPayLoad; downPayLoad += writable.downPayLoad; } KpiWritable value3 = new KpiWritable(); value3.set(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad)); context.write(key2, value3); } }}class KpiWritable implements Writable{ long upPackNum; long downPackNum; long upPayLoad; long downPayLoad; @Override public void write(DataOutput out) throws IOException { out.writeLong(this.upPackNum); out.writeLong(this.downPackNum); out.writeLong(this.upPayLoad); out.writeLong(this.downPayLoad); } public void set(String string, String string2, String string3, String string4) { this.upPackNum = Long.parseLong(string); this.downPackNum = Long.parseLong(string2); this.upPayLoad = Long.parseLong(string3); this.downPayLoad = Long.parseLong(string4); } @Override public void readFields(DataInput in) throws IOException { this.upPackNum = in.readLong(); this.downPackNum = in.readLong(); this.upPayLoad = in.readLong(); this.downPayLoad = in.readLong(); } @Override public String toString() { return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad; }}class KpiPartition extends Partitioner { @Override public int getPartition(Text key, KpiWritable value, int numPartitions) { String string = key.toString(); return string.length()==11?0:1; }}
Paritioner是Hashpartitioner的基类,如果需要定制Partitioner也需要继承该类。
HashPartitioner是MapReduce的默认Partitioner。
以上是"Hadoop中如何分区"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
篇文章
内容
不怎么
大部分
更多
知识
行业
资讯
资讯频道
频道
分组
参考
学习
帮助
排序
运行
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
阿拉德之怒服务器排名
服务器经验指令怎么弄
c 连接数据库增删改查封装
公司服务器会设置监控吗
服务器断电后通电能ping通吗
数据库软件 2017
网络安全实训报告留言系统
通信网络安全发展现状分析
服务器安全管理策略不足
服务器不定时重启怎么办
上海工程软件开发分类
fm 开档数据库
特朗普解雇美国网络安全
计算机网络安全应急响应
衢州天气预报软件开发
网络安全法学是什么
北京it软件开发哪家好
中国知网中国工具书全文数据库
展板筑牢网络安全防线
组态软件开发工程
安装数据库实例后
服务器管理员用什么软件
微笑数据库损坏卸载
关系云数据库服务
天津云服务器ecs 服务器
公安信息网络安全交流
设计实现学生选课数据库
服务器不定时重启怎么办
加服务器能提高电脑性能吗
家庭医生呼叫平台数据库