六、MapReduce排序例子--获取价格最高的商品信息
发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,1、需求获取每个订单中最贵的商品用到的知识点:自定义排序,包括普通排序,二次排序,分组排序自定义分区2、数据输入和输出格式数据输入格式:每个已售商品一条记录订单id 商品id 商品价格0000
千家信息网最后更新 2025年01月23日六、MapReduce排序例子--获取价格最高的商品信息
1、需求
获取每个订单中最贵的商品
用到的知识点:
自定义排序,包括普通排序,二次排序,分组排序
自定义分区
2、数据输入和输出格式
数据输入格式:
每个已售商品一条记录订单id 商品id 商品价格0000001 Pdt_01 222.80000002 Pdt_06 722.40000001 Pdt_05 25.80000003 Pdt_01 222.80000003 Pdt_01 33.80000002 Pdt_03 522.80000002 Pdt_04 122.4
数据输出格式:
每个订单一个文件,每个文件中显示各自订单最贵的一件商品的信息
3、分析
map阶段:
因为要求每个订单最贵的商品,所以必须根据订单号以及商品价格做二次排序。后面将订单号、商品id,商品价格组合成一个bean对象,作为key,作为map的输出。
自定义分区:
我们的需求是统计出同一订单中,最贵的商品,那么这就要求同一订单的所有商品条目都必须落在同一分区中(这里分区数大于1)才能统计处理,如果在不同分区中,那么是无法统计的,因为不用reduce之间是没有关联的。这里实现方式就是自定义分区,采用订单ID来分区,这样同一订单ID的商品条目就都落在同一个分区中了。而且在map输出自动根据订单id分区的过程中,对key先按照id和price排序,这样其实就是对同一订单的商品中,按照商品价格进行了排序了。
reduce阶段:
前面map输出的数据已经是每个订单中对商品价格进行了排序,在第一个的商品就是该订单中价格最高的商品,后面这里其实只需要取出第一个KV即可。利用自定义group分组排序,将同一订单ID但是不同的商品的KV聚合成一组,因为事实上每组KV的key是不同,而分组中的key是以第一个进入该分组的KV的key为准的,而第一个进入该分组的KV其实就是前面map排序之后得到的同一订单中价格最高的商品的key,所以将其输出即可。
4、代码实现
OrderBean
package GroupOrder;import lombok.AllArgsConstructor;import lombok.Getter;import lombok.NoArgsConstructor;import lombok.Setter;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;@Setter@Getter@NoArgsConstructor@AllArgsConstructorpublic class OrderBean implements WritableComparable { private int ID; private String productID; private double price; /** 二次排序:先根据id排序,如果相同,则根据商品价格排序 */ @Override public int compareTo(OrderBean o) { if (this.ID > o.getID()) { return 1; } else if (this.ID < o.getID()){ return -1; } else { return this.price > o.getPrice() ? -1 : 1; } } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(this.ID); dataOutput.writeDouble(this.price); dataOutput.writeUTF(this.productID); } @Override public void readFields(DataInput dataInput) throws IOException { this.ID = dataInput.readInt(); this.price = dataInput.readDouble(); this.productID = dataInput.readUTF(); } @Override public String toString() { return this.ID + "\t" + this.productID + "\t" + this.price; //return this.ID + "\t" + this.price; }}
map
package GroupOrder;import org.apache.avro.Schema;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class OrderMapper extends Mapper { OrderBean k = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); k.setID(Integer.parseInt(fields[0])); k.setProductID(fields[1]); k.setPrice(Double.parseDouble(fields[2])); context.write(k, NullWritable.get()); }}
partitioner
package GroupOrder;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Partitioner;public class OrderPartitioner extends Partitioner { //根据订单id进行分区 @Override public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) { return (orderBean.getID() & Integer.MAX_VALUE) % i; }}
reduce
package GroupOrder;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class OrderReducer extends Reducer { @Override protected void reduce(OrderBean key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); }}
groupCompartor
package GroupOrder;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/** * 定制reduce前group的分组依据 * */public class OrderGroupCompartor extends WritableComparator { protected OrderGroupCompartor() { super(OrderBean.class, true); } /** * 以orderbean对象中的ID为分组依据。 * 同一ID的认为是同一个group,一个group只会调用一次reduce * * @param a * @param b * @return */ @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean aOrderBean = (OrderBean) a; OrderBean bOrderBean = (OrderBean) b; if (aOrderBean.getID() > bOrderBean.getID()) { return 1; } else if (aOrderBean.getID() < bOrderBean.getID()) { return -1; } else { return 0; } }}
driver
package GroupOrder;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class OrderDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\A\\GroupingComparator.txt", "G:\\test\\A\\comparator6\\"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(OrderDriver.class); job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); //设置分区实现类 job.setPartitionerClass(OrderPartitioner.class); job.setNumReduceTasks(3); //设置group的实现类 job.setGroupingComparatorClass(OrderGroupCompartor.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }}
商品
订单
排序
价格
分组
商品价格
输出
就是
数据
不同
格式
统计
最高
对象
文件
条目
订单号
阶段
需求
输入
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数控编程常用数据库
计算机网络安全领域的3a指
金山区管理软件开发内容
武汉丿aVa软件开发招聘
怎么重新搭web服务器
服务器无法打开exe
软件开发工具包括
测试加工费 软件开发
服务器硬盘哪个比较好
彝绣数据库
从事软件开发的印度人是什么种姓
3g网络技术构成
java数据库索引建立
kyqp4234服务器
网络技术上机题
数据库商品信息表
聚系列服务器是什么
信息网络安全期刊杂志
莆田山信网络技术有限公司
青海网络安全周
ipad如何选择网络安全性
北航网络安全考研科目
acess 数据库技术
岳阳软件开发培训价格
应用软件开发用哪种台式机好
信息网络安全期刊杂志
嘉定区标准网络技术服务参考价格
微企云服务器的应用
ps5国内现在有服务器了吗
wgs数据库