HBase-1.0.1学习笔记(四)MapReduce操作HBase
发表于:2024-10-27 作者:千家信息网编辑
千家信息网最后更新 2024年10月27日,鲁春利的工作笔记,谁说程序员不能有文艺范?环境:hadoop-2.6.0hbase-1.0.1zookeeper-3.4.61、Hadoop集群配置过程略;2、Zookeeper集群配置过程略;3、H
千家信息网最后更新 2024年10月27日HBase-1.0.1学习笔记(四)MapReduce操作HBase
鲁春利的工作笔记,谁说程序员不能有文艺范?
环境:
hadoop-2.6.0
hbase-1.0.1
zookeeper-3.4.6
1、Hadoop集群配置过程略;
2、Zookeeper集群配置过程略;
3、HBase集群配置过程略;
4、HBase作为输入源示例
查看当前hbase表m_domain中的数据
[hadoop@dnode1 conf]$ hbase shellHBase Shell; enter 'help' for list of supported commands.Type "exit " to leave the HBase ShellVersion 1.0.1, r66a93c09df3b12ff7b86c39bc8475c60e15af82d, Fri Apr 17 22:14:06 PDT 2015hbase(main):001:0> listTABLE m_domaint_domain2 row(s) in 0.9270 seconds=> ["m_domain", "t_domain"]hbase(main):002:0> scan 'm_domain'ROW COLUMN+CELL alibaba.com_19990415_20220523 column=cf:access_server, timestamp=1440947490018, value=\xE6\x9D\xAD\xE5\xB7\x9Ealibaba.com_19990415_20220523 column=cf:exp_date, timestamp=1440947490018, value=2022\xE5\xB9\xB405\xE6\x9C\x8823\xE6\x97\xA5alibaba.com_19990415_20220523 column=cf:ipstr, timestamp=1440947490018, value=205.204.101.42alibaba.com_19990415_20220523 column=cf:owner, timestamp=1440947490018, value=Hangzhou Alibaba Advertising Co.alibaba.com_19990415_20220523 column=cf:reg_date, timestamp=1440947490018, value=1999\xE5\xB9\xB404\xE6\x9C\x8815\xE6\x97\xA5baidu.com_19991011_20151011 column=cf:access_server, timestamp=1440947489956, value=\xE5\x8C\x97\xE4\xBA\xACbaidu.com_19991011_20151011 column=cf:exp_date, timestamp=1440947489956, value=2015\xE5\xB9\xB410\xE6\x9C\x8811\xE6\x97\xA5 baidu.com_19991011_20151011 column=cf:ipstr, timestamp=1440947489956, value=220.181.57.217baidu.com_19991011_20151011 column=cf:reg_date, timestamp=1440947489956, value=1999\xE5\xB9\xB410\xE6\x9C\x8811\xE6\x97\xA52 row(s) in 1.4560 secondshbase(main):003:0> quit
实现Mapper端
package com.invic.mapreduce.hbase.source;import java.io.IOException;import java.util.Map;import java.util.Map.Entry;import java.util.NavigableMap;import java.util.Set;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;/** * * @author lucl * TableMapper扩展自Mapper类,所有以HBase作为输入源的Mapper类都需要继承该类 */public class HBaseReaderMapper extends TableMapper{ private Text key = new Text(); private Text value = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override protected void map(ImmutableBytesWritable row, Result result,Context context) throws IOException, InterruptedException { // 可以明确给定family { NavigableMap map = result.getFamilyMap("cf".getBytes()); Set > values = map.entrySet(); for (Entry entry : values) { String columnQualifier = new String(entry.getKey()); String cellValue = new String(entry.getValue()); System.out.println(columnQualifier + "\t" + cellValue); // } } // 存在多个列族或者不确定列族名字 { String rowKey = new String(row.get()); byte [] columnFamily = null; byte [] columnQualifier = null; byte [] cellValue = null; StringBuffer sbf = new StringBuffer(1024); for (Cell cell : result.listCells()) { columnFamily = CellUtil.cloneFamily(cell); columnQualifier = CellUtil.cloneQualifier(cell); cellValue = CellUtil.cloneValue(cell); sbf.append(Bytes.toString(columnFamily)); sbf.append("."); sbf.append(Bytes.toString(columnQualifier)); sbf.append(":"); sbf.append(new String(cellValue, "UTF-8")); } key.set(rowKey); value.set(sbf.toString()); context.write(key, value); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException{ super.cleanup(context); }}
实现MapReduce的Driver类
package com.invic.mapreduce.hbase.source;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** * * @author lucl * HBase作为输入源示例 * */public class HBaseASDataSourceDriver extends Configured implements Tool { /** * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // System.setProperty("hadoop.home.dir", "E:\\hadoop-2.6.0\\hadoop-2.6.0\\"); int exit = ToolRunner.run(new HBaseASDataSourceDriver(), args); System.out.println("receive exit : " + exit); } @Override public int run(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); // hadoop的参数配置 /*conf.set("fs.defaultFS", "hdfs://cluster"); conf.set("dfs.nameservices", "cluster"); conf.set("dfs.ha.namenodes.cluster", "nn1,nn2"); conf.set("dfs.namenode.rpc-address.cluster.nn1", "nnode:8020"); conf.set("dfs.namenode.rpc-address.cluster.nn2", "dnode1:8020"); conf.set("dfs.client.failover.proxy.provider.cluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");*/ // hbase master // property "hbase.master" has been deprecated since 0.90 // Just passing the ZK configuration makes your client auto-discover the master // conf.set("hbase.master", "nnode:60000"); // zookeeper quorum getConf().set("hbase.zookeeper.property.clientport", "2181"); getConf().set("hbase.zookeeper.quorum", "nnode,dnode1,dnode2"); // 是否对Map Task启用推测执行机制 getConf().setBoolean("mapreduce.map.speculative", false); // 是否对Reduce Task启用推测执行机制 getConf().setBoolean("mapreduce.reduce.speculative", false); Job job = Job.getInstance(conf); job.setJobName("MyBaseReaderFromHBase"); job.setJarByClass(HBaseASDataSourceDriver.class); job.setOutputFormatClass(TextOutputFormat.class); /** * 从HBase读取数据时数据会传给下面定义的Mapper来,在Mapper类中进行了数据的处理 * 由于在job中未指定Reducer类,会调用默认的Reducer类来将Mapper的输出原封不动的写入; * 如果需要在Reducer中再做些其他的单独的处理,则可以自定义Reducer类再做些处理。 */ Scan scan = new Scan(); // scan.addFamily(family); // scan.addColumn(family, qualifier); byte [] tableName = Bytes.toBytes("m_domain"); TableMapReduceUtil.initTableMapperJob(tableName, scan, HBaseReaderMapper.class, Text.class, Text.class, job); Path path = new Path("/" + System.currentTimeMillis()); FileOutputFormat.setOutputPath(job, path); return job.waitForCompletion(true) ? 0 : 1; } }
查看结果:
问题记录:
a. 通过Eclipse执行时报错,但未分析出原因
b. 放到集群环境中运行时Mapper类如果定义在Driver类中,则报错
ClassNotFound for HBaseASDataSourceDriver$HBaseReaderMapper init()
c. zookeeper连接符总是显示连接的为127.0.0.1而非配置的zookeeper.quorum
如果zookeeper集群环境与hbase环境在不同的机器不知道是否会出现问题。
5、Hbase作为输出源示例
文本文件内容如下:
2013-09-13 16:04:08 www.subnetc1.com 192.168.1.7 80 192.168.1.139 18863 HTTP www.subnetc1.com/index.html2013-09-13 16:04:08 www.subnetc2.com 192.168.1.7 80 192.168.1.159 14100 HTTP www.subnetc2.com/index.html2013-09-13 16:04:08 www.subnetc3.com 192.168.1.7 80 192.168.1.130 4927 HTTP www.subnetc3.com/index.html2013-09-13 16:04:08 www.subnetc4.com 192.168.1.7 80 192.168.1.154 39044 HTTP www.subnetc4.com/index.html
Map端代码:
package com.invic.mapreduce.hbase.target;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class MyMapper extends Mapper
Reducer端代码:
package com.invic.mapreduce.hbase.target;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;/** * * @author lucl * */public class MyReducer extends TableReducer{ @Override public void reduce(Text key, Iterable value, Context context) throws IOException, InterruptedException { // for wordcount // TableReducer // Iterable /*{ int sum = 0; for (Iterator it = value.iterator(); it.hasNext(); ) { IntWritableval = it.next(); sum += val.get(); } Put put = new Put(key.getBytes()); // sum为Integer类型,需要先转为S他ring,然后再取byte值,否则查看数据时无法显示sum的值 byte [] datas = Bytes.toBytes(String.valueOf(sum)); put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), datas); context.write(new ImmutableBytesWritable(key.getBytes()), put); }*/ // 需要将多列写入HBase // TableReducer // Iterable value { byte [] family = "cf".getBytes(); Put put = new Put(key.getBytes()); StringBuffer sbf = new StringBuffer(); for (Text text : value) { sbf.append(text.toString()); } put.addColumn(family, Bytes.toBytes("detail"), Bytes.toBytes(sbf.toString())); context.write(new ImmutableBytesWritable(key.getBytes()), put); } }}
Driver驱动类:
package com.invic.mapreduce.hbase.target;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.HColumnDescriptor;import org.apache.hadoop.hbase.HTableDescriptor;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Admin;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** * * @author lucl * HBase作为输出源示例 * */public class HBaseASDataTargetDriver extends Configured implements Tool { private static final String TABLE_NAME = "t_inter_log"; private static final String COLUMN_FAMILY_NAME = "cf"; /** * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // for eclipse // System.setProperty("hadoop.home.dir", "E:\\hadoop-2.6.0\\hadoop-2.6.0\\"); int exit = ToolRunner.run(new HBaseASDataTargetDriver(), args); System.out.println("receive exit : " + exit); } @Override public int run(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(getConf()); // hadoop的参数配置 conf.set("fs.defaultFS", "hdfs://cluster"); conf.set("dfs.nameservices", "cluster"); conf.set("dfs.ha.namenodes.cluster", "nn1,nn2"); conf.set("dfs.namenode.rpc-address.cluster.nn1", "nnode:8020"); conf.set("dfs.namenode.rpc-address.cluster.nn2", "dnode1:8020"); conf.set("dfs.client.failover.proxy.provider.cluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); // hbase master // property "hbase.master" has been deprecated since 0.90 // Just passing the ZK configuration makes your client auto-discover the master // conf.set("hbase.master", "nnode:60000"); // zookeeper quorum conf.set("hbase.zookeeper.property.clientport", "2181"); conf.set("hbase.zookeeper.quorum", "nnode,dnode1,dnode2"); // 是否对Map Task启用推测执行机制 conf.setBoolean("mapreduce.map.speculative", false); // 是否对Reduce Task启用推测执行机制 conf.setBoolean("mapreduce.reduce.speculative", false); /** * HBase创建表 */ Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); TableName tableName = TableName.valueOf(TABLE_NAME); boolean exists = admin.tableExists(tableName); if (exists) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor tableDesc = new HTableDescriptor(tableName); HColumnDescriptor columnDesc = new HColumnDescriptor(COLUMN_FAMILY_NAME); tableDesc.addFamily(columnDesc); admin.createTable(tableDesc); /** * 读取文件内容 */ String fileName = "http_interceptor_20130913.txt"; Job job = Job.getInstance(conf); job.setJobName("MyBaseWriterToHBase"); job.setJarByClass(HBaseASDataTargetDriver.class); job.setMapperClass(MyMapper.class); /** * MapReduce读取文本文件时默认的的四个参数(KeyIn, ValueIn,KeyOut,ValueOut) * 说明: * 默认情况下KeyIn为IntWrite类型,为在文本文件中的偏移量,ValueIn为一行数据 * 第一次测试时未设置的设置map端输出的key-value类型,程序执行正常 * 第二次增加map端输出的key-value类型设置 * job.setMapOutputKeyClass * job.setMapOutputValueClass * Hadoop应用开发技术详解2015年1月第1版P191页写的: * map端输出的key-value默认类型分别为LongWritable和Text * 根据示例程序MyMapper中实现的map端输出的key-value实际为Text和IntWritable * // job.setMapOutputKeyClass(LongWritable.class); // job.setMapOutputValueClass(Text.class); // 设置后页面调用时报错如下: 15/09/04 22:19:06 INFO mapreduce.Job: Task Id : attempt_1441346242717_0014_m_000000_0, Status : FAILED Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, received org.apache.hadoop.io.Text at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1069) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:21) at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:1) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 第三次设置为与Mapper类中一致的,程序执行正确。 */ job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 下面这句话不能加,在测试中发现加了这句话竟然报错找不到MyReducer类了。 // job.setReducerClass(MyReducer.class); Path path = new Path(fileName); FileInputFormat.addInputPath(job, path); TableMapReduceUtil.initTableReducerJob(TABLE_NAME, MyReducer.class, job); // for wordcount // job.setOutputKeyClass(Text.class); // job.setOutputValueClass(IntWritable.class); // for multi columns job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return job.waitForCompletion(true) ? 0 : 1; } }
未设置Map输出的key-value的类型时报错如下(wordcount的示例未报错,在Hadoop应用开发技术详解中说map端输出的key-value默认类型为:LongWritable.class和Text.class,但是wordcount示例中map端输出的key-value类型却为Text.class和IntWritable):
15/09/04 21:15:54 INFO mapreduce.Job: map 0% reduce 0/09/04 21:16:27 INFO mapreduce.Job: Task Id : attempt_1441346242717_0011_m_000000_0, Status : FAILEDError: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.IntWritable, received org.apache.hadoop.io.Text at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1074) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:29) at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:1) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) # 由于出现错误时Map端为0%,所以分析问题出现在map端,且根据提示信息说明默认value应该是IntWritable,我第二次的示例与wordcount的差别主要在map端输出的value由IntWritabe->Text,设置了如下参数后问题解决。# job.setMapOutputKeyClass(Text.class);# job.setMapOutputValueClass(Text.class);
wordcount及数据入库示例程序执行结果验证:
hbase(main):005:0> scan 't_inter_log'ROW COLUMN+CELL 14100 column=cf:count, timestamp=1441370812728, value=1 16:04:08 column=cf:count, timestamp=1441370812728, value=4 18863:08 column=cf:count, timestamp=1441370812728, value=1 192.168.1.130 column=cf:count, timestamp=1441370812728, value=1 192.168.1.139 column=cf:count, timestamp=1441370812728, value=1 192.168.1.154 column=cf:count, timestamp=1441370812728, value=1 192.168.1.159 column=cf:count, timestamp=1441370812728, value=1 192.168.1.759 column=cf:count, timestamp=1441370812728, value=4 2013-09-13759 column=cf:count, timestamp=1441370812728, value=4 3904409-13759 column=cf:count, timestamp=1441370812728, value=1 4927409-13759 column=cf:count, timestamp=1441370812728, value=1 8027409-13759 column=cf:count, timestamp=1441370812728, value=4 HTTP409-13759 column=cf:count, timestamp=1441370812728, value=4 www.subnetc1.com column=cf:count, timestamp=1441370812728, value=1 www.subnetc1.com/index.html column=cf:count, timestamp=1441370812728, value=1 www.subnetc2.com/index.html column=cf:count, timestamp=1441370812728, value=1 www.subnetc3.com/index.html column=cf:count, timestamp=1441370812728, value=1 www.subnetc4.com/index.html column=cf:count, timestamp=1441370812728, value=1 18 row(s) in 1.2290 seconds # 每次执行时都会先删除t_inter_log表hbase(main):007:0> scan 't_inter_log'ROW COLUMN+CELLwww.subnetc1.com column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc1.com\x09192.168.1.7\x0980\x09192.168.1.139\x0918863\x09HTTP\x09www.subnetc1.com/index.html www.subnetc2.com column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc2.com\x09192.168.1.7\x0980\x09192.168.1.159\x0914100\x09HTTP\x09www.subnetc2.com/index.html www.subnetc3.com column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc3.com\x09192.168.1.7\x0980\x09192.168.1.130\x094927\x09HTTP\x09www.subnetc3.com/index.html www.subnetc4.com column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc4.com\x09192.168.1.7\x0980\x09192.168.1.154\x0939044\x09HTTP\x09www.subnetc4.com/index.html4 row(s) in 3.3280 seconds
6、HBase作为共享源示例
示例
输出
数据
类型
程序
配置
集群
参数
机制
环境
问题
文件
时报
过程
处理
输入
代码
内容
开发技术
技术
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
300英雄2020服务器列表
软件开发程序员第一步做什么
域dns服务器解析慢
茂名专业软件开发代理价格
mac能用sql数据库
做数据库去小县城
王英建 网络安全
山东招远网络安全吗
关于云服务器租赁采购讨论事宜
虚拟主机的数据库地址
公司介绍软件开发公司电话
json 数据库 传值
考研网络安全
北京hp服务器阵列卡安装云主机
网络安全法 拒不整改
关于软件开发类的问题
苏州必加互联网科技有限公司
太原网络安全招聘
黔东南软件开发培训学校
服务器4个硬盘能分2个区吗
我的世界服务器新手奖励怎么设置
易语言手机软件开发工具
青岛辅德网络技术有限公司
软件开发设计大纲
更换服务器ip后网站打不开
临海银行网络安全宣传
先进网络安全服务
网络安全和防范摘要
使用笔记本对服务器本地管理
山西通讯软件开发服务应用