十二、MapReduce--mapjoin和reducejoin
发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,一、map join1、适用场景:一张表很大,一张表很小2、解决方案:在map端缓存多张表,提前处理业务逻辑,这样增加map端业务,减少reduce端的数据压力,尽可能减少数据倾斜。3、具体方法:采用
千家信息网最后更新 2025年02月04日十二、MapReduce--mapjoin和reducejoin
一、map join
1、适用场景:
一张表很大,一张表很小
2、解决方案:
在map端缓存多张表,提前处理业务逻辑,这样增加map端业务,减少reduce端的数据压力,尽可能减少数据倾斜。
3、具体方法:采用分布式缓存
(1)在mapper的setup阶段,将文件读取到缓存集合中
(2)在driver中加载缓存,job.addCacheFile(new URI("file:/e:/mapjoincache/pd.txt"));// 缓存普通文件到task运行节点。
4、实例
//order.txt订单id 商品id 商品数量1001 01 11002 02 21003 03 31004 01 41005 02 51006 03 6//pd.txt商品id 商品名01 小米02 华为03 格力
要将order中的商品id替换为商品名称,缓存 pd.txt 这个小表
mapper:
package MapJoin;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.*;import java.util.HashMap;import java.util.Map;public class MapJoinMapper extends Mapper { Map productMap = new HashMap(); Text k = new Text(); /** * * 将 pd.txt加载到hashmap中,只加载一次 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader productReader = new BufferedReader(new InputStreamReader(new FileInputStream(new File("G:\\test\\A\\mapjoin\\pd.txt")))); String line; while (StringUtils.isNotEmpty(line = productReader.readLine())) { String[] fields = line.split("\t"); productMap.put(fields[0], fields[1]); } productReader.close(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String productName = productMap.get(fields[1]); k.set(fields[0] + "\t" + productName + "\t" + fields[2]); context.write(k, NullWritable.get()); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); }}
driver:
package MapJoin;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;public class MapJoinDriver { public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\A\\mapjoin\\order.txt", "G:\\test\\A\\mapjoin\\join2\\"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MapJoinDriver.class); job.setMapperClass(MapJoinMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //将重复使用的小文件加载到缓存中 job.addCacheFile(new URI("file:///G:/test/A/mapjoin/pd.txt")); job.setNumReduceTasks(0); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }}
二、reduce join
1、分析思路
通过将关联条件作为map的输出的key,也就是使用商品ID来作为key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联
输入的数据和上面的map join一样,输出的结果也和上面的类似
bean:
package ReduceJoin;import lombok.AllArgsConstructor;import lombok.Getter;import lombok.NoArgsConstructor;import lombok.Setter;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;@Getter@Setter@NoArgsConstructor@AllArgsConstructorpublic class OrderBean implements Writable { private String orderID; private String productID; private int amount; private String productName; private String flag; @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.orderID); dataOutput.writeUTF(this.productID); dataOutput.writeInt(this.amount); dataOutput.writeUTF(this.productName); dataOutput.writeUTF(this.flag); } @Override public void readFields(DataInput dataInput) throws IOException { this.orderID = dataInput.readUTF(); this.productID = dataInput.readUTF(); this.amount = dataInput.readInt(); this.productName = dataInput.readUTF(); this.flag = dataInput.readUTF(); } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.orderID); sb.append("\t"); sb.append(this.productName); sb.append("\t"); sb.append(this.amount); sb.append("\t"); sb.append(this.flag); return sb.toString(); }}
map:
package ReduceJoin;import org.apache.commons.beanutils.BeanUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class OrderMapper extends Mapper { Text k = new Text(); OrderBean v = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); FileSplit inputSplit = (FileSplit)context.getInputSplit(); String fileName = inputSplit.getPath().getName(); //将商品id作为map输出的key if (fileName.startsWith("order")) { k.set(fields[1]); v.setOrderID(fields[0]); v.setProductID(fields[1]); v.setAmount(Integer.parseInt(fields[2])); v.setFlag("0"); v.setProductName(""); } else { k.set(fields[0]); v.setOrderID(""); v.setAmount(0); v.setProductID(fields[0]); v.setProductName(fields[1]); v.setFlag("1"); } context.write(k, v); }}
reduce:
package ReduceJoin;import org.apache.commons.beanutils.BeanUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.lang.reflect.InvocationTargetException;import java.util.ArrayList;public class OrderReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //key是productID,如果订单表和商品名称表的productID相同,则key相同,会merge在一起 // //reduce输出是将每个订单列表输出的 ArrayList orderBeans = new ArrayList<>(); OrderBean pdBean = new OrderBean(); OrderBean tmp = new OrderBean(); for(OrderBean bean : values) { if ("0".equals(bean.getFlag())) { try { BeanUtils.copyProperties(tmp, bean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } orderBeans.add(tmp); //orderBeans.add(bean); } else { //取出商品名称的KV try { BeanUtils.copyProperties(pdBean, bean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } //获取当前的KV的productName,并输出 for (OrderBean o : orderBeans) { o.setProductName(pdBean.getProductName()); context.write(o, NullWritable.get()); } }}
driver:
package ReduceJoin;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class OrderDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\A\\mapjoin\\", "G:\\test\\A\\reducejoin12\\"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(OrderDriver.class); job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(OrderBean.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }}
商品
缓存
数据
输出
文件
名称
订单
相同
业务
条件
面的
很大
普通
也就是
信息
分布式
压力
场景
多张
实例
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发效率评估方法
保定网络安全支队
动画渲染服务器怎么配
国家规定网络安全隐患等级
c 软件开发和后台开发
小网络安全周总结
网络技术实训室建设方案
丁少岩网络安全
广东城管通软件开发
数据库专业版
软件开发如何做分录
服务器离线状态
管理服务器远程
酒泉网络安全检查
网络安全宣传设计物品
电脑服务器需要清理吗
武汉烽火研究生软件开发工资
网络安全教育日简报
数据库增量更新开源
sql附加数据库文件哪里找
软件开发培训好机构
在我的世界里面玩第五人格服务器
曾经发生的网络安全时间
宜兴加工软件开发项目信息
53项网络安全标准有哪些
网络安全协会纪要
福州巨神网络技术有限公司
西藏高校党建软件开发
外国服务器推荐
五亿数据库快速导入