十二、MapReduce--mapjoin和reducejoin
发表于:2024-11-13 作者:千家信息网编辑
千家信息网最后更新 2024年11月13日,一、map join1、适用场景:一张表很大,一张表很小2、解决方案:在map端缓存多张表,提前处理业务逻辑,这样增加map端业务,减少reduce端的数据压力,尽可能减少数据倾斜。3、具体方法:采用
千家信息网最后更新 2024年11月13日十二、MapReduce--mapjoin和reducejoin
一、map join
1、适用场景:
一张表很大,一张表很小
2、解决方案:
在map端缓存多张表,提前处理业务逻辑,这样增加map端业务,减少reduce端的数据压力,尽可能减少数据倾斜。
3、具体方法:采用分布式缓存
(1)在mapper的setup阶段,将文件读取到缓存集合中
(2)在driver中加载缓存,job.addCacheFile(new URI("file:/e:/mapjoincache/pd.txt"));// 缓存普通文件到task运行节点。
4、实例
//order.txt订单id 商品id 商品数量1001 01 11002 02 21003 03 31004 01 41005 02 51006 03 6//pd.txt商品id 商品名01 小米02 华为03 格力
要将order中的商品id替换为商品名称,缓存 pd.txt 这个小表
mapper:
package MapJoin;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.*;import java.util.HashMap;import java.util.Map;public class MapJoinMapper extends Mapper { Map productMap = new HashMap(); Text k = new Text(); /** * * 将 pd.txt加载到hashmap中,只加载一次 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader productReader = new BufferedReader(new InputStreamReader(new FileInputStream(new File("G:\\test\\A\\mapjoin\\pd.txt")))); String line; while (StringUtils.isNotEmpty(line = productReader.readLine())) { String[] fields = line.split("\t"); productMap.put(fields[0], fields[1]); } productReader.close(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String productName = productMap.get(fields[1]); k.set(fields[0] + "\t" + productName + "\t" + fields[2]); context.write(k, NullWritable.get()); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); }}
driver:
package MapJoin;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;public class MapJoinDriver { public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\A\\mapjoin\\order.txt", "G:\\test\\A\\mapjoin\\join2\\"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MapJoinDriver.class); job.setMapperClass(MapJoinMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //将重复使用的小文件加载到缓存中 job.addCacheFile(new URI("file:///G:/test/A/mapjoin/pd.txt")); job.setNumReduceTasks(0); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }}
二、reduce join
1、分析思路
通过将关联条件作为map的输出的key,也就是使用商品ID来作为key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联
输入的数据和上面的map join一样,输出的结果也和上面的类似
bean:
package ReduceJoin;import lombok.AllArgsConstructor;import lombok.Getter;import lombok.NoArgsConstructor;import lombok.Setter;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;@Getter@Setter@NoArgsConstructor@AllArgsConstructorpublic class OrderBean implements Writable { private String orderID; private String productID; private int amount; private String productName; private String flag; @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.orderID); dataOutput.writeUTF(this.productID); dataOutput.writeInt(this.amount); dataOutput.writeUTF(this.productName); dataOutput.writeUTF(this.flag); } @Override public void readFields(DataInput dataInput) throws IOException { this.orderID = dataInput.readUTF(); this.productID = dataInput.readUTF(); this.amount = dataInput.readInt(); this.productName = dataInput.readUTF(); this.flag = dataInput.readUTF(); } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.orderID); sb.append("\t"); sb.append(this.productName); sb.append("\t"); sb.append(this.amount); sb.append("\t"); sb.append(this.flag); return sb.toString(); }}
map:
package ReduceJoin;import org.apache.commons.beanutils.BeanUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class OrderMapper extends Mapper { Text k = new Text(); OrderBean v = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); FileSplit inputSplit = (FileSplit)context.getInputSplit(); String fileName = inputSplit.getPath().getName(); //将商品id作为map输出的key if (fileName.startsWith("order")) { k.set(fields[1]); v.setOrderID(fields[0]); v.setProductID(fields[1]); v.setAmount(Integer.parseInt(fields[2])); v.setFlag("0"); v.setProductName(""); } else { k.set(fields[0]); v.setOrderID(""); v.setAmount(0); v.setProductID(fields[0]); v.setProductName(fields[1]); v.setFlag("1"); } context.write(k, v); }}
reduce:
package ReduceJoin;import org.apache.commons.beanutils.BeanUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.lang.reflect.InvocationTargetException;import java.util.ArrayList;public class OrderReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //key是productID,如果订单表和商品名称表的productID相同,则key相同,会merge在一起 // //reduce输出是将每个订单列表输出的 ArrayList orderBeans = new ArrayList<>(); OrderBean pdBean = new OrderBean(); OrderBean tmp = new OrderBean(); for(OrderBean bean : values) { if ("0".equals(bean.getFlag())) { try { BeanUtils.copyProperties(tmp, bean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } orderBeans.add(tmp); //orderBeans.add(bean); } else { //取出商品名称的KV try { BeanUtils.copyProperties(pdBean, bean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } //获取当前的KV的productName,并输出 for (OrderBean o : orderBeans) { o.setProductName(pdBean.getProductName()); context.write(o, NullWritable.get()); } }}
driver:
package ReduceJoin;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class OrderDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\A\\mapjoin\\", "G:\\test\\A\\reducejoin12\\"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(OrderDriver.class); job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(OrderBean.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }}
商品
缓存
数据
输出
文件
名称
订单
相同
业务
条件
面的
很大
普通
也就是
信息
分布式
压力
场景
多张
实例
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
华为VR软件开发中心南昌
武威app软件开发哪家好
KMS代替国密数据库
数据库字段类型包括备注吗
酒店网络安全管理处罚措施
甘肃网络安全论坛
数据库入侵容忍技术有哪两类
中小学网络安全知识竞赛题库
服务器网卡速率计算
润灵数据库说明书
域白果互联网科技有限公司
问的我网络安全技术举例说明
属于数据库实现阶段的工作的是
学软件开发对电脑配置
我的世界空岛战争服务器在哪下载
虚拟主机可以开游戏服务器吗
在数据库中比较两份名单
数据库心得体会认识
新服务器带外管理配置文件
软件开发公司技术部经理
反编译gps数据库
华为服务器租赁
高端数据库技术知识
网络安全办公区的意思
大力宣传校园网络安全
软件开发技术合同成本核算单
公司网络安全案例及分析
无锡电脑软件开发团队
手机开我的世界服务器
服务器卡顿怎么修