HBase-1.0.1学习笔记(四)MapReduce操作HBase
发表于:2024-10-27 作者:千家信息网编辑
千家信息网最后更新 2024年10月27日,鲁春利的工作笔记,谁说程序员不能有文艺范?环境:hadoop-2.6.0hbase-1.0.1zookeeper-3.4.61、Hadoop集群配置过程略;2、Zookeeper集群配置过程略;3、H
千家信息网最后更新 2024年10月27日HBase-1.0.1学习笔记(四)MapReduce操作HBase
鲁春利的工作笔记,谁说程序员不能有文艺范?
环境:
hadoop-2.6.0
hbase-1.0.1
zookeeper-3.4.6
1、Hadoop集群配置过程略;
2、Zookeeper集群配置过程略;
3、HBase集群配置过程略;
4、HBase作为输入源示例
查看当前hbase表m_domain中的数据
[hadoop@dnode1 conf]$ hbase shellHBase Shell; enter 'help' for list of supported commands.Type "exit " to leave the HBase ShellVersion 1.0.1, r66a93c09df3b12ff7b86c39bc8475c60e15af82d, Fri Apr 17 22:14:06 PDT 2015hbase(main):001:0> listTABLE m_domaint_domain2 row(s) in 0.9270 seconds=> ["m_domain", "t_domain"]hbase(main):002:0> scan 'm_domain'ROW COLUMN+CELL alibaba.com_19990415_20220523 column=cf:access_server, timestamp=1440947490018, value=\xE6\x9D\xAD\xE5\xB7\x9Ealibaba.com_19990415_20220523 column=cf:exp_date, timestamp=1440947490018, value=2022\xE5\xB9\xB405\xE6\x9C\x8823\xE6\x97\xA5alibaba.com_19990415_20220523 column=cf:ipstr, timestamp=1440947490018, value=205.204.101.42alibaba.com_19990415_20220523 column=cf:owner, timestamp=1440947490018, value=Hangzhou Alibaba Advertising Co.alibaba.com_19990415_20220523 column=cf:reg_date, timestamp=1440947490018, value=1999\xE5\xB9\xB404\xE6\x9C\x8815\xE6\x97\xA5baidu.com_19991011_20151011 column=cf:access_server, timestamp=1440947489956, value=\xE5\x8C\x97\xE4\xBA\xACbaidu.com_19991011_20151011 column=cf:exp_date, timestamp=1440947489956, value=2015\xE5\xB9\xB410\xE6\x9C\x8811\xE6\x97\xA5 baidu.com_19991011_20151011 column=cf:ipstr, timestamp=1440947489956, value=220.181.57.217baidu.com_19991011_20151011 column=cf:reg_date, timestamp=1440947489956, value=1999\xE5\xB9\xB410\xE6\x9C\x8811\xE6\x97\xA52 row(s) in 1.4560 secondshbase(main):003:0> quit
实现Mapper端
package com.invic.mapreduce.hbase.source;import java.io.IOException;import java.util.Map;import java.util.Map.Entry;import java.util.NavigableMap;import java.util.Set;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;/** * * @author lucl * TableMapper扩展自Mapper类,所有以HBase作为输入源的Mapper类都需要继承该类 */public class HBaseReaderMapper extends TableMapper{ private Text key = new Text(); private Text value = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override protected void map(ImmutableBytesWritable row, Result result,Context context) throws IOException, InterruptedException { // 可以明确给定family { NavigableMap map = result.getFamilyMap("cf".getBytes()); Set > values = map.entrySet(); for (Entry entry : values) { String columnQualifier = new String(entry.getKey()); String cellValue = new String(entry.getValue()); System.out.println(columnQualifier + "\t" + cellValue); // } } // 存在多个列族或者不确定列族名字 { String rowKey = new String(row.get()); byte [] columnFamily = null; byte [] columnQualifier = null; byte [] cellValue = null; StringBuffer sbf = new StringBuffer(1024); for (Cell cell : result.listCells()) { columnFamily = CellUtil.cloneFamily(cell); columnQualifier = CellUtil.cloneQualifier(cell); cellValue = CellUtil.cloneValue(cell); sbf.append(Bytes.toString(columnFamily)); sbf.append("."); sbf.append(Bytes.toString(columnQualifier)); sbf.append(":"); sbf.append(new String(cellValue, "UTF-8")); } key.set(rowKey); value.set(sbf.toString()); context.write(key, value); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException{ super.cleanup(context); }}
实现MapReduce的Driver类
package com.invic.mapreduce.hbase.source;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** * * @author lucl * HBase作为输入源示例 * */public class HBaseASDataSourceDriver extends Configured implements Tool { /** * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // System.setProperty("hadoop.home.dir", "E:\\hadoop-2.6.0\\hadoop-2.6.0\\"); int exit = ToolRunner.run(new HBaseASDataSourceDriver(), args); System.out.println("receive exit : " + exit); } @Override public int run(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); // hadoop的参数配置 /*conf.set("fs.defaultFS", "hdfs://cluster"); conf.set("dfs.nameservices", "cluster"); conf.set("dfs.ha.namenodes.cluster", "nn1,nn2"); conf.set("dfs.namenode.rpc-address.cluster.nn1", "nnode:8020"); conf.set("dfs.namenode.rpc-address.cluster.nn2", "dnode1:8020"); conf.set("dfs.client.failover.proxy.provider.cluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");*/ // hbase master // property "hbase.master" has been deprecated since 0.90 // Just passing the ZK configuration makes your client auto-discover the master // conf.set("hbase.master", "nnode:60000"); // zookeeper quorum getConf().set("hbase.zookeeper.property.clientport", "2181"); getConf().set("hbase.zookeeper.quorum", "nnode,dnode1,dnode2"); // 是否对Map Task启用推测执行机制 getConf().setBoolean("mapreduce.map.speculative", false); // 是否对Reduce Task启用推测执行机制 getConf().setBoolean("mapreduce.reduce.speculative", false); Job job = Job.getInstance(conf); job.setJobName("MyBaseReaderFromHBase"); job.setJarByClass(HBaseASDataSourceDriver.class); job.setOutputFormatClass(TextOutputFormat.class); /** * 从HBase读取数据时数据会传给下面定义的Mapper来,在Mapper类中进行了数据的处理 * 由于在job中未指定Reducer类,会调用默认的Reducer类来将Mapper的输出原封不动的写入; * 如果需要在Reducer中再做些其他的单独的处理,则可以自定义Reducer类再做些处理。 */ Scan scan = new Scan(); // scan.addFamily(family); // scan.addColumn(family, qualifier); byte [] tableName = Bytes.toBytes("m_domain"); TableMapReduceUtil.initTableMapperJob(tableName, scan, HBaseReaderMapper.class, Text.class, Text.class, job); Path path = new Path("/" + System.currentTimeMillis()); FileOutputFormat.setOutputPath(job, path); return job.waitForCompletion(true) ? 0 : 1; } }
查看结果:
问题记录:
a. 通过Eclipse执行时报错,但未分析出原因
b. 放到集群环境中运行时Mapper类如果定义在Driver类中,则报错
ClassNotFound for HBaseASDataSourceDriver$HBaseReaderMapper init()
c. zookeeper连接符总是显示连接的为127.0.0.1而非配置的zookeeper.quorum
如果zookeeper集群环境与hbase环境在不同的机器不知道是否会出现问题。
5、Hbase作为输出源示例
文本文件内容如下:
2013-09-13 16:04:08 www.subnetc1.com 192.168.1.7 80 192.168.1.139 18863 HTTP www.subnetc1.com/index.html2013-09-13 16:04:08 www.subnetc2.com 192.168.1.7 80 192.168.1.159 14100 HTTP www.subnetc2.com/index.html2013-09-13 16:04:08 www.subnetc3.com 192.168.1.7 80 192.168.1.130 4927 HTTP www.subnetc3.com/index.html2013-09-13 16:04:08 www.subnetc4.com 192.168.1.7 80 192.168.1.154 39044 HTTP www.subnetc4.com/index.html
Map端代码:
package com.invic.mapreduce.hbase.target;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class MyMapper extends Mapper
Reducer端代码:
package com.invic.mapreduce.hbase.target;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;/** * * @author lucl * */public class MyReducer extends TableReducer{ @Override public void reduce(Text key, Iterable value, Context context) throws IOException, InterruptedException { // for wordcount // TableReducer // Iterable /*{ int sum = 0; for (Iterator it = value.iterator(); it.hasNext(); ) { IntWritableval = it.next(); sum += val.get(); } Put put = new Put(key.getBytes()); // sum为Integer类型,需要先转为S他ring,然后再取byte值,否则查看数据时无法显示sum的值 byte [] datas = Bytes.toBytes(String.valueOf(sum)); put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), datas); context.write(new ImmutableBytesWritable(key.getBytes()), put); }*/ // 需要将多列写入HBase // TableReducer // Iterable value { byte [] family = "cf".getBytes(); Put put = new Put(key.getBytes()); StringBuffer sbf = new StringBuffer(); for (Text text : value) { sbf.append(text.toString()); } put.addColumn(family, Bytes.toBytes("detail"), Bytes.toBytes(sbf.toString())); context.write(new ImmutableBytesWritable(key.getBytes()), put); } }}
Driver驱动类:
package com.invic.mapreduce.hbase.target;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.HColumnDescriptor;import org.apache.hadoop.hbase.HTableDescriptor;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Admin;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** * * @author lucl * HBase作为输出源示例 * */public class HBaseASDataTargetDriver extends Configured implements Tool { private static final String TABLE_NAME = "t_inter_log"; private static final String COLUMN_FAMILY_NAME = "cf"; /** * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // for eclipse // System.setProperty("hadoop.home.dir", "E:\\hadoop-2.6.0\\hadoop-2.6.0\\"); int exit = ToolRunner.run(new HBaseASDataTargetDriver(), args); System.out.println("receive exit : " + exit); } @Override public int run(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(getConf()); // hadoop的参数配置 conf.set("fs.defaultFS", "hdfs://cluster"); conf.set("dfs.nameservices", "cluster"); conf.set("dfs.ha.namenodes.cluster", "nn1,nn2"); conf.set("dfs.namenode.rpc-address.cluster.nn1", "nnode:8020"); conf.set("dfs.namenode.rpc-address.cluster.nn2", "dnode1:8020"); conf.set("dfs.client.failover.proxy.provider.cluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); // hbase master // property "hbase.master" has been deprecated since 0.90 // Just passing the ZK configuration makes your client auto-discover the master // conf.set("hbase.master", "nnode:60000"); // zookeeper quorum conf.set("hbase.zookeeper.property.clientport", "2181"); conf.set("hbase.zookeeper.quorum", "nnode,dnode1,dnode2"); // 是否对Map Task启用推测执行机制 conf.setBoolean("mapreduce.map.speculative", false); // 是否对Reduce Task启用推测执行机制 conf.setBoolean("mapreduce.reduce.speculative", false); /** * HBase创建表 */ Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); TableName tableName = TableName.valueOf(TABLE_NAME); boolean exists = admin.tableExists(tableName); if (exists) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor tableDesc = new HTableDescriptor(tableName); HColumnDescriptor columnDesc = new HColumnDescriptor(COLUMN_FAMILY_NAME); tableDesc.addFamily(columnDesc); admin.createTable(tableDesc); /** * 读取文件内容 */ String fileName = "http_interceptor_20130913.txt"; Job job = Job.getInstance(conf); job.setJobName("MyBaseWriterToHBase"); job.setJarByClass(HBaseASDataTargetDriver.class); job.setMapperClass(MyMapper.class); /** * MapReduce读取文本文件时默认的的四个参数(KeyIn, ValueIn,KeyOut,ValueOut) * 说明: * 默认情况下KeyIn为IntWrite类型,为在文本文件中的偏移量,ValueIn为一行数据 * 第一次测试时未设置的设置map端输出的key-value类型,程序执行正常 * 第二次增加map端输出的key-value类型设置 * job.setMapOutputKeyClass * job.setMapOutputValueClass * Hadoop应用开发技术详解2015年1月第1版P191页写的: * map端输出的key-value默认类型分别为LongWritable和Text * 根据示例程序MyMapper中实现的map端输出的key-value实际为Text和IntWritable * // job.setMapOutputKeyClass(LongWritable.class); // job.setMapOutputValueClass(Text.class); // 设置后页面调用时报错如下: 15/09/04 22:19:06 INFO mapreduce.Job: Task Id : attempt_1441346242717_0014_m_000000_0, Status : FAILED Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, received org.apache.hadoop.io.Text at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1069) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:21) at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:1) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 第三次设置为与Mapper类中一致的,程序执行正确。 */ job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 下面这句话不能加,在测试中发现加了这句话竟然报错找不到MyReducer类了。 // job.setReducerClass(MyReducer.class); Path path = new Path(fileName); FileInputFormat.addInputPath(job, path); TableMapReduceUtil.initTableReducerJob(TABLE_NAME, MyReducer.class, job); // for wordcount // job.setOutputKeyClass(Text.class); // job.setOutputValueClass(IntWritable.class); // for multi columns job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return job.waitForCompletion(true) ? 0 : 1; } }
未设置Map输出的key-value的类型时报错如下(wordcount的示例未报错,在Hadoop应用开发技术详解中说map端输出的key-value默认类型为:LongWritable.class和Text.class,但是wordcount示例中map端输出的key-value类型却为Text.class和IntWritable):
15/09/04 21:15:54 INFO mapreduce.Job: map 0% reduce 0/09/04 21:16:27 INFO mapreduce.Job: Task Id : attempt_1441346242717_0011_m_000000_0, Status : FAILEDError: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.IntWritable, received org.apache.hadoop.io.Text at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1074) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:29) at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:1) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) # 由于出现错误时Map端为0%,所以分析问题出现在map端,且根据提示信息说明默认value应该是IntWritable,我第二次的示例与wordcount的差别主要在map端输出的value由IntWritabe->Text,设置了如下参数后问题解决。# job.setMapOutputKeyClass(Text.class);# job.setMapOutputValueClass(Text.class);
wordcount及数据入库示例程序执行结果验证:
hbase(main):005:0> scan 't_inter_log'ROW COLUMN+CELL 14100 column=cf:count, timestamp=1441370812728, value=1 16:04:08 column=cf:count, timestamp=1441370812728, value=4 18863:08 column=cf:count, timestamp=1441370812728, value=1 192.168.1.130 column=cf:count, timestamp=1441370812728, value=1 192.168.1.139 column=cf:count, timestamp=1441370812728, value=1 192.168.1.154 column=cf:count, timestamp=1441370812728, value=1 192.168.1.159 column=cf:count, timestamp=1441370812728, value=1 192.168.1.759 column=cf:count, timestamp=1441370812728, value=4 2013-09-13759 column=cf:count, timestamp=1441370812728, value=4 3904409-13759 column=cf:count, timestamp=1441370812728, value=1 4927409-13759 column=cf:count, timestamp=1441370812728, value=1 8027409-13759 column=cf:count, timestamp=1441370812728, value=4 HTTP409-13759 column=cf:count, timestamp=1441370812728, value=4 www.subnetc1.com column=cf:count, timestamp=1441370812728, value=1 www.subnetc1.com/index.html column=cf:count, timestamp=1441370812728, value=1 www.subnetc2.com/index.html column=cf:count, timestamp=1441370812728, value=1 www.subnetc3.com/index.html column=cf:count, timestamp=1441370812728, value=1 www.subnetc4.com/index.html column=cf:count, timestamp=1441370812728, value=1 18 row(s) in 1.2290 seconds # 每次执行时都会先删除t_inter_log表hbase(main):007:0> scan 't_inter_log'ROW COLUMN+CELLwww.subnetc1.com column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc1.com\x09192.168.1.7\x0980\x09192.168.1.139\x0918863\x09HTTP\x09www.subnetc1.com/index.html www.subnetc2.com column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc2.com\x09192.168.1.7\x0980\x09192.168.1.159\x0914100\x09HTTP\x09www.subnetc2.com/index.html www.subnetc3.com column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc3.com\x09192.168.1.7\x0980\x09192.168.1.130\x094927\x09HTTP\x09www.subnetc3.com/index.html www.subnetc4.com column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc4.com\x09192.168.1.7\x0980\x09192.168.1.154\x0939044\x09HTTP\x09www.subnetc4.com/index.html4 row(s) in 3.3280 seconds
6、HBase作为共享源示例
示例
输出
数据
类型
程序
配置
集群
参数
机制
环境
问题
文件
时报
过程
处理
输入
代码
内容
开发技术
技术
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库转换dat
计算机网络技术考研文库
数字货币量化交易软件开发商
淘宝上货软件开发公司
数据库就是用来保存什么的仓库
上海联恩商钥互联网科技股份
软件开发研发立项终结
虹口区散射网络技术造型设计
机柜服务器位置管理
在数据库中创建会话命令
鹤壁市锦腾互联网科技有限公司
惠普服务器开机马上掉电
php读取数据库到列表
软件开发项目档案验收意见
数据库审计日志的作用
北京市网络安全细则
服务器怎么通过管理口做系统
jsp链接数据库的图片
台州同城游无法连接服务器
数据库如何删除表某行中数据
数据库图表技术
达内网络安全面试
成都邦邦网络安全
河南小赚丰网络技术公司
泰州工程软件开发哪家好
网络安全技术经验
罗斯文数据库使用技巧
贵州通信软件开发服务标准
天河区数据网络技术开发服务标准
骨质疏松病人数据库 nih