hadoop2.2.0如何定制mapreduce输出到数据库
发表于:2024-10-22 作者:千家信息网编辑
千家信息网最后更新 2024年10月22日,今天就跟大家聊聊有关hadoop2.2.0如何定制mapreduce输出到数据库,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。hadoop2.
千家信息网最后更新 2024年10月22日hadoop2.2.0如何定制mapreduce输出到数据库
今天就跟大家聊聊有关hadoop2.2.0如何定制mapreduce输出到数据库,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
hadoop2.2.0定制mapreduce输出到数据库:
这里以redis数据库为例。
这里的例子是,我想统计日志文件中的某天各个小时的访问量,日志格式为:
2014-02-10 04:52:34 127.0.0.1 xxx
我们知道在写mapreduce job时,要配置输入输出,然后编写mapper和reducer类,hadoop默认输出是到hdfs的文件中,例如:
job.setOutputFormatClass(FileOutputFormat.class);
现在我们想要将任务计算结果输出到数据库(redis)中,怎么做呢?可以继承FileOutputFormat类,定制自己的类,看代码:
public class LoginLogOutputFormatextends FileOutputFormat { /** * 重点也是定制一个RecordWriter类,每一条reduce处理后的记录,我们便可将该记录输出到数据库中 */ protected static class RedisRecordWriter extends RecordWriter { private Jedis jedis; //redis的client实例 public RedisRecordWriter(Jedis jedis){ this.jedis = jedis; } @Override public void write(K key, V value) throws IOException, InterruptedException { boolean nullKey = key == null; boolean nullValue = value == null; if (nullKey || nullValue) return; String[] sKey = key.toString().split("-"); String outKey = sKey[0]+"-"+sKey[1]+"-"+sKey[2]+"_login_stat"; //zset key为yyyy-MM-dd_login_stat jedis.zadd(outKey.getBytes("UTF-8"), -1, (sKey[3]+":"+value).getBytes("UTF-8")); //zadd, 其值格式为: 时刻:访问量 } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if (jedis != null) jedis.disconnect(); //关闭链接 } } @Override public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Jedis jedis = RedisClient.newJedis(); //构建一个redis,这里你可以自己根据实际情况来构建数据库连接对象 //System.out.println("构建RedisRecordWriter"); return new RedisRecordWriter (jedis); }}
下面就是整个job实现:
public class LoginLogStatTask extends Configured implements Tool { public static class MyMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (value == null || "".equals(value)) return; // 解析value,如: 2014-02-10 04:52:34 127.0.0.1 xxx String[] fields = value.toString().split(" "); String date = fields[0]; String time = fields[1]; String hour = time.split(":")[0]; String outKey = date+"-"+hour; context.write(new Text(outKey), new IntWritable(1)); } } public static class MyReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int count = 0; while (values.iterator().hasNext()){ //统计数量 count ++; values.iterator().next(); } context.write(key, new IntWritable(count)); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); List inputs = new ArrayList<>(); String inputPath = args[0]; if (inputPath.endsWith("/")){ //如果是目录 inputs.addAll(HdfsUtil.listFiles(inputPath, conf)); } else{ //如果是文件 inputs.add(new Path(inputPath)); } long ts = System.currentTimeMillis(); String jobName = "login_logs_stat_job_" + ts; Job job = Job.getInstance(conf, jobName); job.setJarByClass(LoginLogStatTask.class); //添加输入文件路径 for (Path p : inputs){ FileInputFormat.addInputPath(job, p); } //设置输出路径 Path out = new Path(jobName + ".out"); //以jobName.out作为输出 FileOutputFormat.setOutputPath(job, out); //设置mapper job.setMapperClass(MyMapper.class); //设置reducer job.setReducerClass(MyReducer.class); //设置输入格式 job.setInputFormatClass(TextInputFormat.class); //设置输出格式 job.setOutputFormatClass(LoginLogOutputFormat.class); //设置输出key类型 job.setOutputKeyClass(Text.class); //设置输出value类型 job.setOutputValueClass(IntWritable.class); job.waitForCompletion(true); return job.isSuccessful()?0:1; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); int res = ToolRunner.run(conf, new LoginLogStatTask(), args); System.exit(res); }
运行job后,就会在redis数据库中有对应的key:
看完上述内容,你们对hadoop2.2.0如何定制mapreduce输出到数据库有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
输出
数据
数据库
文件
格式
内容
输入
日志
类型
访问量
路径
UTF-8
统计
代码
任务
例子
实例
实际
对象
小时
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
中移二期中标数据库
电子商务员软件开发
微软的补丁管理服务器
三台服务器
千锋网络安全学习
求职数据库
数据库的表设计不能修改怎么办
达梦数据库开发接口
网络安全角色
大数据网络技术教学
网络安全与大数据理学
实施网络安全工程 加强
怎么样让网络安全意识提高
钢铁企业工控网络安全会议
检索pdb数据库的软件
管理服务器开机不启动
辽宁特色软件开发项目信息
怎么判断数据库是open
江西宝粒网络技术有限公司
苏州常规软件开发哪家好
我的世界服务器地狱版
从化app软件开发多少钱
杨浦区网络技术转让代理品牌
asp.net 云服务器
商业银行数据库的安全措施
把服务器挂到云安全吗
应用存储和其他服务器的区别
网络安全密罐
数据库原理第一堂课
科技互联网公司法律顾问