六、MapReduce排序例子--获取价格最高的商品信息
发表于:2024-10-17 作者:千家信息网编辑
千家信息网最后更新 2024年10月17日,1、需求获取每个订单中最贵的商品用到的知识点:自定义排序,包括普通排序,二次排序,分组排序自定义分区2、数据输入和输出格式数据输入格式:每个已售商品一条记录订单id 商品id 商品价格0000
千家信息网最后更新 2024年10月17日六、MapReduce排序例子--获取价格最高的商品信息
1、需求
获取每个订单中最贵的商品
用到的知识点:
自定义排序,包括普通排序,二次排序,分组排序
自定义分区
2、数据输入和输出格式
数据输入格式:
每个已售商品一条记录订单id 商品id 商品价格0000001 Pdt_01 222.80000002 Pdt_06 722.40000001 Pdt_05 25.80000003 Pdt_01 222.80000003 Pdt_01 33.80000002 Pdt_03 522.80000002 Pdt_04 122.4
数据输出格式:
每个订单一个文件,每个文件中显示各自订单最贵的一件商品的信息
3、分析
map阶段:
因为要求每个订单最贵的商品,所以必须根据订单号以及商品价格做二次排序。后面将订单号、商品id,商品价格组合成一个bean对象,作为key,作为map的输出。
自定义分区:
我们的需求是统计出同一订单中,最贵的商品,那么这就要求同一订单的所有商品条目都必须落在同一分区中(这里分区数大于1)才能统计处理,如果在不同分区中,那么是无法统计的,因为不用reduce之间是没有关联的。这里实现方式就是自定义分区,采用订单ID来分区,这样同一订单ID的商品条目就都落在同一个分区中了。而且在map输出自动根据订单id分区的过程中,对key先按照id和price排序,这样其实就是对同一订单的商品中,按照商品价格进行了排序了。
reduce阶段:
前面map输出的数据已经是每个订单中对商品价格进行了排序,在第一个的商品就是该订单中价格最高的商品,后面这里其实只需要取出第一个KV即可。利用自定义group分组排序,将同一订单ID但是不同的商品的KV聚合成一组,因为事实上每组KV的key是不同,而分组中的key是以第一个进入该分组的KV的key为准的,而第一个进入该分组的KV其实就是前面map排序之后得到的同一订单中价格最高的商品的key,所以将其输出即可。
4、代码实现
OrderBean
package GroupOrder;import lombok.AllArgsConstructor;import lombok.Getter;import lombok.NoArgsConstructor;import lombok.Setter;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;@Setter@Getter@NoArgsConstructor@AllArgsConstructorpublic class OrderBean implements WritableComparable { private int ID; private String productID; private double price; /** 二次排序:先根据id排序,如果相同,则根据商品价格排序 */ @Override public int compareTo(OrderBean o) { if (this.ID > o.getID()) { return 1; } else if (this.ID < o.getID()){ return -1; } else { return this.price > o.getPrice() ? -1 : 1; } } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(this.ID); dataOutput.writeDouble(this.price); dataOutput.writeUTF(this.productID); } @Override public void readFields(DataInput dataInput) throws IOException { this.ID = dataInput.readInt(); this.price = dataInput.readDouble(); this.productID = dataInput.readUTF(); } @Override public String toString() { return this.ID + "\t" + this.productID + "\t" + this.price; //return this.ID + "\t" + this.price; }}
map
package GroupOrder;import org.apache.avro.Schema;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class OrderMapper extends Mapper { OrderBean k = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); k.setID(Integer.parseInt(fields[0])); k.setProductID(fields[1]); k.setPrice(Double.parseDouble(fields[2])); context.write(k, NullWritable.get()); }}
partitioner
package GroupOrder;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Partitioner;public class OrderPartitioner extends Partitioner { //根据订单id进行分区 @Override public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) { return (orderBean.getID() & Integer.MAX_VALUE) % i; }}
reduce
package GroupOrder;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class OrderReducer extends Reducer { @Override protected void reduce(OrderBean key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); }}
groupCompartor
package GroupOrder;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/** * 定制reduce前group的分组依据 * */public class OrderGroupCompartor extends WritableComparator { protected OrderGroupCompartor() { super(OrderBean.class, true); } /** * 以orderbean对象中的ID为分组依据。 * 同一ID的认为是同一个group,一个group只会调用一次reduce * * @param a * @param b * @return */ @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean aOrderBean = (OrderBean) a; OrderBean bOrderBean = (OrderBean) b; if (aOrderBean.getID() > bOrderBean.getID()) { return 1; } else if (aOrderBean.getID() < bOrderBean.getID()) { return -1; } else { return 0; } }}
driver
package GroupOrder;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class OrderDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\A\\GroupingComparator.txt", "G:\\test\\A\\comparator6\\"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(OrderDriver.class); job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); //设置分区实现类 job.setPartitionerClass(OrderPartitioner.class); job.setNumReduceTasks(3); //设置group的实现类 job.setGroupingComparatorClass(OrderGroupCompartor.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }}
商品
订单
排序
价格
分组
商品价格
输出
就是
数据
不同
格式
统计
最高
对象
文件
条目
订单号
阶段
需求
输入
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发 咨询
网络安全等级保护机构人员要求
如何把源文件导入到数据库
软件开发过程与管理实训个人总结
区块链网络安全应用方法
租虚拟服务器玩游戏
凉风垭服务器可以给货车加水吗
做公司网络安全不能干的太认真
成都天财在线网络技术有限公司
山西智能软件开发在线咨询
基岩版服务器管理权限
服务器安全等级A B C
对接港交所数据库需要条件
网络安全攻防的手法
恐龙岛吃鸡服务器
内网crt怎么登陆服务器
数据库的隔离性怎么保证
sql数据库网址
数据库rollup函数
呼市软件开发企业
无线网络技术的发展应用
深圳sp3服务器散热器价格
软件开发是哪些专业
大数据与网络安全申论
网络安全漏洞攻防
服务器管理员叫什么
腾讯投屏无法访问服务器
巨人通力电梯清故障用服务器吗
蓝晖网络技术有限公司
wicc 服务器指的是什么