hadoop2.2.0如何定制mapreduce输出到数据库
发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,今天就跟大家聊聊有关hadoop2.2.0如何定制mapreduce输出到数据库,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。hadoop2.
千家信息网最后更新 2025年02月04日hadoop2.2.0如何定制mapreduce输出到数据库
今天就跟大家聊聊有关hadoop2.2.0如何定制mapreduce输出到数据库,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
hadoop2.2.0定制mapreduce输出到数据库:
这里以redis数据库为例。
这里的例子是,我想统计日志文件中的某天各个小时的访问量,日志格式为:
2014-02-10 04:52:34 127.0.0.1 xxx
我们知道在写mapreduce job时,要配置输入输出,然后编写mapper和reducer类,hadoop默认输出是到hdfs的文件中,例如:
job.setOutputFormatClass(FileOutputFormat.class);
现在我们想要将任务计算结果输出到数据库(redis)中,怎么做呢?可以继承FileOutputFormat类,定制自己的类,看代码:
public class LoginLogOutputFormatextends FileOutputFormat { /** * 重点也是定制一个RecordWriter类,每一条reduce处理后的记录,我们便可将该记录输出到数据库中 */ protected static class RedisRecordWriter extends RecordWriter { private Jedis jedis; //redis的client实例 public RedisRecordWriter(Jedis jedis){ this.jedis = jedis; } @Override public void write(K key, V value) throws IOException, InterruptedException { boolean nullKey = key == null; boolean nullValue = value == null; if (nullKey || nullValue) return; String[] sKey = key.toString().split("-"); String outKey = sKey[0]+"-"+sKey[1]+"-"+sKey[2]+"_login_stat"; //zset key为yyyy-MM-dd_login_stat jedis.zadd(outKey.getBytes("UTF-8"), -1, (sKey[3]+":"+value).getBytes("UTF-8")); //zadd, 其值格式为: 时刻:访问量 } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if (jedis != null) jedis.disconnect(); //关闭链接 } } @Override public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Jedis jedis = RedisClient.newJedis(); //构建一个redis,这里你可以自己根据实际情况来构建数据库连接对象 //System.out.println("构建RedisRecordWriter"); return new RedisRecordWriter (jedis); }}
下面就是整个job实现:
public class LoginLogStatTask extends Configured implements Tool { public static class MyMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (value == null || "".equals(value)) return; // 解析value,如: 2014-02-10 04:52:34 127.0.0.1 xxx String[] fields = value.toString().split(" "); String date = fields[0]; String time = fields[1]; String hour = time.split(":")[0]; String outKey = date+"-"+hour; context.write(new Text(outKey), new IntWritable(1)); } } public static class MyReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int count = 0; while (values.iterator().hasNext()){ //统计数量 count ++; values.iterator().next(); } context.write(key, new IntWritable(count)); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); List inputs = new ArrayList<>(); String inputPath = args[0]; if (inputPath.endsWith("/")){ //如果是目录 inputs.addAll(HdfsUtil.listFiles(inputPath, conf)); } else{ //如果是文件 inputs.add(new Path(inputPath)); } long ts = System.currentTimeMillis(); String jobName = "login_logs_stat_job_" + ts; Job job = Job.getInstance(conf, jobName); job.setJarByClass(LoginLogStatTask.class); //添加输入文件路径 for (Path p : inputs){ FileInputFormat.addInputPath(job, p); } //设置输出路径 Path out = new Path(jobName + ".out"); //以jobName.out作为输出 FileOutputFormat.setOutputPath(job, out); //设置mapper job.setMapperClass(MyMapper.class); //设置reducer job.setReducerClass(MyReducer.class); //设置输入格式 job.setInputFormatClass(TextInputFormat.class); //设置输出格式 job.setOutputFormatClass(LoginLogOutputFormat.class); //设置输出key类型 job.setOutputKeyClass(Text.class); //设置输出value类型 job.setOutputValueClass(IntWritable.class); job.waitForCompletion(true); return job.isSuccessful()?0:1; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); int res = ToolRunner.run(conf, new LoginLogStatTask(), args); System.exit(res); }
运行job后,就会在redis数据库中有对应的key:
看完上述内容,你们对hadoop2.2.0如何定制mapreduce输出到数据库有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
输出
数据
数据库
文件
格式
内容
输入
日志
类型
访问量
路径
UTF-8
统计
代码
任务
例子
实例
实际
对象
小时
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
服务器运行慢变得很卡怎么办
朗乾互联网科技有限公司
国家网络安全宣传活动计划
三大类网络安全技术
网络安全产值低
网络安全手抄报图片文案温柔
王者荣耀 服务器推荐
万德数据库怎么收藏指标
虹口区进口网络技术开发产品
保密及网络安全考试
现代网络技术与课堂教学
服务器 性能容量管理
数据库怎么关联表与表的字段
保障网络安全的产品有哪些
软件销售税率软件开发税率
中国网络安全重大事件
清空数据库表数据
数据库用代码创建高考成绩表
未成立网络安全机构处罚
普陀区车载网络技术创新服务
网络安全日的参与主体有哪些
iq 数据库 pdf
sp全称软件开发
完善数据库的意义
拉曼光谱 标准 数据库
我的世界原版服务器多人生存推荐
电脑本地数据库怎么进
逆战主播服务器
删除mysql数据库用户
数据库鸡兔同笼