MapReduce实现自定义分区的方法
发表于:2024-11-18 作者:千家信息网编辑
千家信息网最后更新 2024年11月18日,简介:mapreduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。它通过把对数据集的大规模操作分发给网络上的每个节点实现可靠性,极大地方便了编程人员在不会分布式并行编程的情况下,将自己
千家信息网最后更新 2024年11月18日MapReduce实现自定义分区的方法
简介:
mapreduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。
它通过把对数据集的大规模操作分发给网络上的每个节点实现可靠性,极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。
MapReduce自带的分区器是HashPartitioner
原理:
先对map输出的key求hash值,再模上reduce task个数,根据结果,决定此输出kv对,被匹配的reduce任务取走。
自定义分分区需要继承Partitioner
,复写getpariton()
方法
自定义分区类:
注意:map的输出是
其中int partitionIndex = dict.get(text.toString())
,partitionIndex
是获取K的值
附:被计算的的文本
Dear Dear Bear Bear River Car Dear Dear Bear RiveDear Dear Bear Bear River Car Dear Dear Bear Rive
需要在main函数中设置,指定自定义分区类
自定义分区类:
import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;import java.util.HashMap;public class CustomPartitioner extends Partitioner { public static HashMap dict = new HashMap(); //Text代表着map阶段输出的key,IntWritable代表着输出的值 static{ dict.put("Dear", 0); dict.put("Bear", 1); dict.put("River", 2); dict.put("Car", 3); } public int getPartition(Text text, IntWritable intWritable, int i) { // int partitionIndex = dict.get(text.toString()); return partitionIndex; }}
注意:map的输出结果是键值对int partitionIndex = dict.get(text.toString());
中的partitionIndex
是map输出键值对中的键的值,也就是K的值。
Maper类:
import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class WordCountMap extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\t"); for (String word : words) { // 每个单词出现1次,作为中间结果输出 context.write(new Text(word), new IntWritable(1)); } }}
Reducer类:
import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class WordCountMap extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\t"); for (String word : words) { // 每个单词出现1次,作为中间结果输出 context.write(new Text(word), new IntWritable(1)); } }}
main函数:
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCountMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { if (args.length != 2 || args == null) { System.out.println("please input Path!"); System.exit(0); } Configuration configuration = new Configuration(); configuration.set("mapreduce.job.jar","/home/bruce/project/kkbhdp01/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar"); Job job = Job.getInstance(configuration, WordCountMain.class.getSimpleName()); // 打jar包 job.setJarByClass(WordCountMain.class); // 通过job设置输入/输出格式 //job.setInputFormatClass(TextInputFormat.class); //job.setOutputFormatClass(TextOutputFormat.class); // 设置输入/输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 设置处理Map/Reduce阶段的类 job.setMapperClass(WordCountMap.class); //map combine //job.setCombinerClass(WordCountReduce.class); job.setReducerClass(WordCountReduce.class); //如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行;如果不一样,需要分别设置map, reduce的输出的kv类型 //job.setMapOutputKeyClass(.class) // 设置最终输出key/value的类型m job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setPartitionerClass(CustomPartitioner.class); job.setNumReduceTasks(4); // 提交作业 job.waitForCompletion(true); }}
main函数参数设置:
输出
结果
函数
类型
编程
代表
分布式
单词
大规模
数据
阶段
大规
输入
方法
一致
个数
也就是
人员
任务
原理
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
更换主机 连接不上数据库
上古卷轴5登录服务器禁用
数据库按什么键
全球上市网络安全企业
徐州云计算服务器
局域网忽然访问不到服务器了
陌陌聊天记录是否上传服务器
做软件开发培训
内蒙数据库怎么注册
三管齐下ai赋能网络安全
主域名和子域名绑定不同服务器
网络安全服务规则
濮阳 软件开发
mc的服务器怎么装模组
斗罗大陆魂师对决都有哪些服务器
激光电视连接不上服务器怎么回事
网络上的传真服务器
物流网络技术 孙甲泉
网络安全管理定义
网络安全大佬薪资
网络安全防护名词
明光加工软件开发技术代理商
泡泡糖游戏服务器连不上
旗瀚科技 软件开发
快狗打车的软件开发代码
软件开发基本知识点
数据库技术与应用查题
网络安全5个等级逐级递增
中国高等植物数据库
服务器远程端口号查询