Hadoop如何实现辅助排序
发表于:2024-11-25 作者:千家信息网编辑
千家信息网最后更新 2024年11月25日,这篇文章主要为大家展示了"Hadoop如何实现辅助排序",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"Hadoop如何实现辅助排序"这篇文章吧。1. 样例数
千家信息网最后更新 2024年11月25日Hadoop如何实现辅助排序
这篇文章主要为大家展示了"Hadoop如何实现辅助排序",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"Hadoop如何实现辅助排序"这篇文章吧。
1. 样例数据
011990-99999 SIHCCAJAVRI012650-99999 TYNSET-HANSMOEN
012650-99999 194903241200 111012650-99999 194903241800 78011990-99999 195005150700 0011990-99999 195005151200 22011990-99999 195005151800 -11
2. 需求
3. 思路、代码
将气象站ID相同的气象站信息和天气信息交由同一个 Reducer 处理,并保证气象站信息首先到达;然后 reduce() 函数从第一行中获取气象台名称,从第二行开始获取天气信息并输出。
import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.io.WritableUtils;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/** * 组合键,此例中用于辅助排序,包括气象站ID和"标记"。 * "标记"是一个虚拟字段,其唯一目的是对记录排序,使气象站的记录比天气记录先到达。 * 虽然可以不指定数据传输次序,并将待处理的记录缓存在内存之中,但应该尽量避免这种情况, * 因为其中任何一组的记录数量都可能非常庞大,远远超出 reducer 的可用内存量 */public class TextPair implements WritableComparable{ private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public Text getSecond() { return second; } public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object obj) { if (obj instanceof TextPair) { TextPair tp = (TextPair) obj; return first.equals(tp.first) && second.equals(tp.second); } return false; } @Override public String toString() { return first + "\t" + second; } public int compareTo(TextPair o) { int cmp = first.compareTo(o.first); if (cmp == 0) { cmp = second.compareTo(o.second); } return cmp; } // RawComparator 允许直接比较数据流中的记录,无须先把数据流反序列化为对象,这样避免了新建对象的额外开销 // WritableComparator 是对继承自 WritableComparable 类的 RawComparator 的一个通用实现。 public static class FirstComparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public FirstComparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { // firstL1、firstL2 表示每个字节流中第一个 Text 字段的长度 int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } @Override public int compare(WritableComparable a, WritableComparable b) { if (a instanceof TextPair && b instanceof TextPair) { return ((TextPair) a).first.compareTo(((TextPair) b).first); } return super.compare(a, b); } }}
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * 标记气象站记录的 mapper */public class JoinStationMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] val = value.toString().split("\\t"); if (val.length == 2) { context.write(new TextPair(val[0], "0"), new Text(val[1])); } }}
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * 标记天气记录的 mapper */public class JoinRecordMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] val = value.toString().split("\\t"); if (val.length == 3) { context.write(new TextPair(val[0], "1"), new Text(val[1] + "\t" + val[2])); } }}
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.util.Iterator;/** * 连接已标记的气象站记录和天气记录的 reducer */public class JoinReducer extends Reducer{ @Override protected void reduce(TextPair key, Iterable values, Context context) throws IOException, InterruptedException { Iterator iter = values.iterator(); Text stationName = new Text(iter.next()); // reducer 会先接收气象站记录(这里千万不能写成 Text stationName = iter.next(); ) while (iter.hasNext()) { Text record = iter.next(); Text outValue = new Text(stationName.toString() + "\t" + record.toString()); context.write(key.getFirst(), outValue); } }}
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class JoinRecordWithStationName { static class KeyPartitioner extends Partitioner{ @Override public int getPartition(TextPair textPair, Text text, int numPartitions) { return (textPair.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 3) { System.err.println("Parameter number is wrong, please enter three parameters:
4. 运行结果
以上是"Hadoop如何实现辅助排序"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
气象
气象站
排序
天气
标记
辅助
信息
数据
处理
内容
篇文章
内存
函数
字段
对象
数据流
气象台
学习
帮助
相同
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
如何引用通达信自定义数据库
统计数据库的基本原则是什么
韩国服务器光算云.怎么走
如何在服务器中设置回话结束功能
播聘网络技术深圳有限公司
ipv6服务器租用
连无线网电视当前服务器异常
软件开发流程控制
csd数据库是什么
数据库入门培训短期班
广东摩羯座互联网科技有限公司
计算机网络技术大学推荐
阿里云服务器架设dnf
浪潮服务器安装系统时内存不足
数据库管理系统候选码
qq服务器上的聊天记录
晋中华为信息与网络技术学院
常州营销网络技术有哪些
数据库SQL的特性
软件开发叫张工
中大网络技术中心
黑虫网络技术有限公司
怀旧服可以转新服务器吗
怎么查找中国疾病大数据库
dell服务器手册
民政婚姻数据库联网了没
sw不能核实此服务器
打包好的数据库软件出现
软件开发换公司吗
分子 光谱数据库