如何利用MapReduce分析明星微博数据
发表于:2024-11-20 作者:千家信息网编辑
千家信息网最后更新 2024年11月20日,这篇文章主要介绍了如何利用MapReduce分析明星微博数据,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。1、项目需求自定义输入格式,
千家信息网最后更新 2024年11月20日如何利用MapReduce分析明星微博数据
这篇文章主要介绍了如何利用MapReduce分析明星微博数据,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
1、项目需求
自定义输入格式,将明星微博数据排序后按粉丝数关注数 微博数分别输出到不同文件中。
2、数据集
明星 明星微博名称 粉丝数 关注数 微博数
俞灏明 俞灏明 10591367 206 558
李敏镐 李敏镐 22898071 11 268
林心如 林心如 57488649 214 5940
黄晓明 黄晓明 22616497 506 2011
张靓颖 张靓颖 27878708 238 3846
李娜 李娜 23309493 81 631
徐小平 徐小平 11659926 1929 13795
唐嫣 唐嫣 24301532 200 2391
有斐君 有斐君 8779383 577 4251
3、分析
自定义InputFormat读取明星微博数据,通过自定义getSortedHashtableByValue方法分别对明星的fan、followers、microblogs数据进行排序,然后利用MultipleOutputs输出不同项到不同的文件中
4、实现
1)、定义WeiBo实体类,实现WritableComparable接口
package com.buaa; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * @ProjectName MicroblogStar * @PackageName com.buaa * @ClassName WeiBo * @Description TODO * @Author 刘吉超 * @Date 2016-05-07 14:54:29 */ public class WeiBo implements WritableComparable
2)、自定义WeiboInputFormat,继承FileInputFormat抽象类
package com.buaa; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.LineReader; /** * @ProjectName MicroblogStar * @PackageName com.buaa * @ClassName WeiboInputFormat * @Description TODO * @Author 刘吉超 * @Date 2016-05-07 10:23:28 */ public class WeiboInputFormat extends FileInputFormat{ @Override public RecordReader createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { // 自定义WeiboRecordReader类,按行读取 return new WeiboRecordReader(); } public class WeiboRecordReader extends RecordReader { public LineReader in; // 声明key类型 public Text lineKey = new Text(); // 声明 value类型 public WeiBo lineValue = new WeiBo(); @Override public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException { // 获取split FileSplit split = (FileSplit)input; // 获取配置 Configuration job = context.getConfiguration(); // 分片路径 Path file = split.getPath(); FileSystem fs = file.getFileSystem(job); // 打开文件 FSDataInputStream filein = fs.open(file); in = new LineReader(filein,job); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // 一行数据 Text line = new Text(); int linesize = in.readLine(line); if(linesize == 0) return false; // 通过分隔符'\t',将每行的数据解析成数组 String[] pieces = line.toString().split("\t"); if(pieces.length != 5){ throw new IOException("Invalid record received"); } int a,b,c; try{ // 粉丝 a = Integer.parseInt(pieces[2].trim()); // 关注 b = Integer.parseInt(pieces[3].trim()); // 微博数 c = Integer.parseInt(pieces[4].trim()); }catch(NumberFormatException nfe){ throw new IOException("Error parsing floating poing value in record"); } //自定义key和value值 lineKey.set(pieces[0]); lineValue.set(a, b, c); return true; } @Override public void close() throws IOException { if(in != null){ in.close(); } } @Override public Text getCurrentKey() throws IOException, InterruptedException { return lineKey; } @Override public WeiBo getCurrentValue() throws IOException, InterruptedException { return lineValue; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } } }
3)、编写mr程序
package com.buaa; import java.io.IOException; import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @ProjectName MicroblogStar * @PackageName com.buaa * @ClassName WeiboCount * @Description TODO * @Author 刘吉超 * @Date 2016-05-07 09:07:36 */ public class WeiboCount extends Configured implements Tool { // tab分隔符 private static String TAB_SEPARATOR = "\t"; // 粉丝 private static String FAN = "fan"; // 关注 private static String FOLLOWERS = "followers"; // 微博数 private static String MICROBLOGS = "microblogs"; public static class WeiBoMapper extends Mapper{ @Override protected void map(Text key, WeiBo value, Context context) throws IOException, InterruptedException { // 粉丝 context.write(new Text(FAN), new Text(key.toString() + TAB_SEPARATOR + value.getFan())); // 关注 context.write(new Text(FOLLOWERS), new Text(key.toString() + TAB_SEPARATOR + value.getFollowers())); // 微博数 context.write(new Text(MICROBLOGS), new Text(key.toString() + TAB_SEPARATOR + value.getMicroblogs())); } } public static class WeiBoReducer extends Reducer { private MultipleOutputs mos; protected void setup(Context context) throws IOException, InterruptedException { mos = new MultipleOutputs (context); } protected void reduce(Text Key, Iterable Values,Context context) throws IOException, InterruptedException { Map map = new HashMap< String,Integer>(); for(Text value : Values){ // value = 名称 + (粉丝数 或 关注数 或 微博数) String[] records = value.toString().split(TAB_SEPARATOR); map.put(records[0], Integer.parseInt(records[1].toString())); } // 对Map内的数据进行排序 Map.Entry [] entries = getSortedHashtableByValue(map); for(int i = 0; i < entries.length;i++){ mos.write(Key.toString(),entries[i].getKey(), entries[i].getValue()); } } protected void cleanup(Context context) throws IOException, InterruptedException { mos.close(); } } @SuppressWarnings("deprecation") @Override public int run(String[] args) throws Exception { // 配置文件对象 Configuration conf = new Configuration(); // 判断路径是否存在,如果存在,则删除 Path mypath = new Path(args[1]); FileSystem hdfs = mypath.getFileSystem(conf); if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } // 构造任务 Job job = new Job(conf, "weibo"); // 主类 job.setJarByClass(WeiboCount.class); // Mapper job.setMapperClass(WeiBoMapper.class); // Mapper key输出类型 job.setMapOutputKeyClass(Text.class); // Mapper value输出类型 job.setMapOutputValueClass(Text.class); // Reducer job.setReducerClass(WeiBoReducer.class); // Reducer key输出类型 job.setOutputKeyClass(Text.class); // Reducer value输出类型 job.setOutputValueClass(IntWritable.class); // 输入路径 FileInputFormat.addInputPath(job, new Path(args[0])); // 输出路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 自定义输入格式 job.setInputFormatClass(WeiboInputFormat.class) ; //自定义文件输出类别 MultipleOutputs.addNamedOutput(job, FAN, TextOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput(job, FOLLOWERS, TextOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput(job, MICROBLOGS, TextOutputFormat.class, Text.class, IntWritable.class); // 去掉job设置outputFormatClass,改为通过LazyOutputFormat设置 LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); //提交任务 return job.waitForCompletion(true)?0:1; } // 对Map内的数据进行排序(只适合小数据量) @SuppressWarnings("unchecked") public static Entry [] getSortedHashtableByValue(Map h) { Entry [] entries = (Entry []) h.entrySet().toArray(new Entry[0]); // 排序 Arrays.sort(entries, new Comparator >() { public int compare(Entry entry1, Entry entry2) { return entry2.getValue().compareTo(entry1.getValue()); } }); return entries; } public static void main(String[] args) throws Exception { String[] args0 = { "hdfs://ljc:9000/buaa/microblog/weibo.txt", "hdfs://ljc:9000/buaa/microblog/out/" }; int ec = ToolRunner.run(new Configuration(), new WeiboCount(), args0); System.exit(ec); } }
5、运行结果
感谢你能够认真阅读完这篇文章,希望小编分享的"如何利用MapReduce分析明星微博数据"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!
数据
输出
明星
文件
粉丝
类型
排序
篇文章
路径
输入
分析
不同
方法
刘吉
任务
分隔符
名称
序列
林心如
格式
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
新手怎么挑选备案型腾讯云服务器
通河计算机网络技术
江苏网络安全隔离装置
编辑图片数据库
瑞金在线网络技术有限公司
校园网络安全情况工作机制
哪些企业会用同方知网数据库
众恒全华网络技术山东有限公司
新浪邮箱收件服务器
ibm服务器电源管理
网络安全教育小报标题
怎么返回数据库选中表格
市南区直播软件开发公司
ruby 服务器记录功能
计算机软件开发难度
怎么进入蜂鸟众包数据库
服务器上的硬盘台式电脑可以用吗
安装数据库时找不见盘符
有v300服务器吗
层次数据库模型特点
带gpu的云服务器
http代理服务器
浙江华为服务器虚拟化操作云主机
数据库以什么形式存储word
软件开发数据库概要设计
中邮人寿软件开发面试
数据库工程师等级
铁路局软件开发笔试
mc怎么建立服务器
网内共享文件服务器