千家信息网

HBase-1.0.1学习笔记(四)MapReduce操作HBase

发表于:2024-10-27 作者:千家信息网编辑
千家信息网最后更新 2024年10月27日,鲁春利的工作笔记,谁说程序员不能有文艺范?环境:hadoop-2.6.0hbase-1.0.1zookeeper-3.4.61、Hadoop集群配置过程略;2、Zookeeper集群配置过程略;3、H
千家信息网最后更新 2024年10月27日HBase-1.0.1学习笔记(四)MapReduce操作HBase

鲁春利的工作笔记,谁说程序员不能有文艺范?



环境:

hadoop-2.6.0

hbase-1.0.1

zookeeper-3.4.6

1、Hadoop集群配置过程略;

2、Zookeeper集群配置过程略;

3、HBase集群配置过程略;

4、HBase作为输入源示例

查看当前hbase表m_domain中的数据

[hadoop@dnode1 conf]$ hbase shellHBase Shell; enter 'help' for list of supported commands.Type "exit" to leave the HBase ShellVersion 1.0.1, r66a93c09df3b12ff7b86c39bc8475c60e15af82d, Fri Apr 17 22:14:06 PDT 2015hbase(main):001:0> listTABLE m_domaint_domain2 row(s) in 0.9270 seconds=> ["m_domain", "t_domain"]hbase(main):002:0> scan 'm_domain'ROW                   COLUMN+CELL alibaba.com_19990415_20220523      column=cf:access_server, timestamp=1440947490018, value=\xE6\x9D\xAD\xE5\xB7\x9Ealibaba.com_19990415_20220523      column=cf:exp_date, timestamp=1440947490018, value=2022\xE5\xB9\xB405\xE6\x9C\x8823\xE6\x97\xA5alibaba.com_19990415_20220523      column=cf:ipstr, timestamp=1440947490018, value=205.204.101.42alibaba.com_19990415_20220523      column=cf:owner, timestamp=1440947490018, value=Hangzhou Alibaba Advertising Co.alibaba.com_19990415_20220523      column=cf:reg_date, timestamp=1440947490018, value=1999\xE5\xB9\xB404\xE6\x9C\x8815\xE6\x97\xA5baidu.com_19991011_20151011       column=cf:access_server, timestamp=1440947489956, value=\xE5\x8C\x97\xE4\xBA\xACbaidu.com_19991011_20151011       column=cf:exp_date, timestamp=1440947489956, value=2015\xE5\xB9\xB410\xE6\x9C\x8811\xE6\x97\xA5       baidu.com_19991011_20151011        column=cf:ipstr, timestamp=1440947489956, value=220.181.57.217baidu.com_19991011_20151011       column=cf:reg_date, timestamp=1440947489956, value=1999\xE5\xB9\xB410\xE6\x9C\x8811\xE6\x97\xA52 row(s) in 1.4560 secondshbase(main):003:0> quit

实现Mapper端

package com.invic.mapreduce.hbase.source;import java.io.IOException;import java.util.Map;import java.util.Map.Entry;import java.util.NavigableMap;import java.util.Set;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;/** *  * @author lucl * TableMapper扩展自Mapper类,所有以HBase作为输入源的Mapper类都需要继承该类 */public class HBaseReaderMapper extends TableMapper {        private Text key = new Text();        private Text value = new Text();                @Override        protected void setup(Context context) throws IOException, InterruptedException {                super.setup(context);        }                @Override        protected void map(ImmutableBytesWritable row, Result result,Context context)                        throws IOException, InterruptedException {                // 可以明确给定family                {                        NavigableMap map = result.getFamilyMap("cf".getBytes());                        Set> values = map.entrySet();                        for (Entry entry : values) {                                String columnQualifier = new String(entry.getKey());                                String cellValue = new String(entry.getValue());                                System.out.println(columnQualifier + "\t" + cellValue);                                //                         }                }                                // 存在多个列族或者不确定列族名字                {                        String rowKey = new String(row.get());                        byte [] columnFamily = null;                        byte [] columnQualifier = null;                        byte [] cellValue = null;                                                StringBuffer sbf = new StringBuffer(1024);                        for (Cell cell : result.listCells()) {                                columnFamily = CellUtil.cloneFamily(cell);                                columnQualifier = CellUtil.cloneQualifier(cell);                                cellValue = CellUtil.cloneValue(cell);                                                                sbf.append(Bytes.toString(columnFamily));                                sbf.append(".");                                sbf.append(Bytes.toString(columnQualifier));                                sbf.append(":");                                sbf.append(new String(cellValue, "UTF-8"));                        }                                                key.set(rowKey);                        value.set(sbf.toString());                        context.write(key, value);                }        }                @Override        protected void cleanup(Context context) throws IOException, InterruptedException{                super.cleanup(context);        }}

实现MapReduce的Driver类

package com.invic.mapreduce.hbase.source;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** *  * @author lucl * HBase作为输入源示例 * */public class HBaseASDataSourceDriver extends Configured implements Tool {        /**         *          * @param args         * @throws Exception          */        public static void main(String[] args) throws Exception {                // System.setProperty("hadoop.home.dir", "E:\\hadoop-2.6.0\\hadoop-2.6.0\\");                                int exit = ToolRunner.run(new HBaseASDataSourceDriver(), args);                System.out.println("receive exit : " + exit);        }        @Override        public int run(String[] args) throws Exception {                Configuration conf = HBaseConfiguration.create();                // hadoop的参数配置                /*conf.set("fs.defaultFS", "hdfs://cluster");                conf.set("dfs.nameservices", "cluster");                conf.set("dfs.ha.namenodes.cluster", "nn1,nn2");                conf.set("dfs.namenode.rpc-address.cluster.nn1", "nnode:8020");                conf.set("dfs.namenode.rpc-address.cluster.nn2", "dnode1:8020");                conf.set("dfs.client.failover.proxy.provider.cluster",                                 "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");*/                // hbase master                // property "hbase.master" has been deprecated since 0.90                // Just passing the ZK configuration makes your client auto-discover the master                // conf.set("hbase.master", "nnode:60000");                // zookeeper quorum                getConf().set("hbase.zookeeper.property.clientport", "2181");                getConf().set("hbase.zookeeper.quorum", "nnode,dnode1,dnode2");                // 是否对Map Task启用推测执行机制                getConf().setBoolean("mapreduce.map.speculative", false);                // 是否对Reduce Task启用推测执行机制                getConf().setBoolean("mapreduce.reduce.speculative", false);                                Job job = Job.getInstance(conf);                job.setJobName("MyBaseReaderFromHBase");                job.setJarByClass(HBaseASDataSourceDriver.class);                job.setOutputFormatClass(TextOutputFormat.class);                                /**                 * 从HBase读取数据时数据会传给下面定义的Mapper来,在Mapper类中进行了数据的处理                 * 由于在job中未指定Reducer类,会调用默认的Reducer类来将Mapper的输出原封不动的写入;                 * 如果需要在Reducer中再做些其他的单独的处理,则可以自定义Reducer类再做些处理。                 */                                Scan scan = new Scan();                // scan.addFamily(family);                // scan.addColumn(family, qualifier);                                byte [] tableName = Bytes.toBytes("m_domain");                                TableMapReduceUtil.initTableMapperJob(tableName, scan, HBaseReaderMapper.class, Text.class, Text.class, job);                                Path path = new Path("/" + System.currentTimeMillis());                FileOutputFormat.setOutputPath(job, path);                                return job.waitForCompletion(true) ? 0 : 1;        }        }

查看结果:

问题记录:

a. 通过Eclipse执行时报错,但未分析出原因

b. 放到集群环境中运行时Mapper类如果定义在Driver类中,则报错

ClassNotFound for HBaseASDataSourceDriver$HBaseReaderMapper init()

c. zookeeper连接符总是显示连接的为127.0.0.1而非配置的zookeeper.quorum

如果zookeeper集群环境与hbase环境在不同的机器不知道是否会出现问题。

5、Hbase作为输出源示例

文本文件内容如下:

2013-09-13 16:04:08    www.subnetc1.com        192.168.1.7     80      192.168.1.139   18863   HTTP    www.subnetc1.com/index.html2013-09-13 16:04:08        www.subnetc2.com        192.168.1.7     80      192.168.1.159   14100   HTTP    www.subnetc2.com/index.html2013-09-13 16:04:08        www.subnetc3.com        192.168.1.7     80      192.168.1.130   4927    HTTP    www.subnetc3.com/index.html2013-09-13 16:04:08        www.subnetc4.com        192.168.1.7     80      192.168.1.154   39044   HTTP    www.subnetc4.com/index.html

Map端代码:

package com.invic.mapreduce.hbase.target;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class MyMapper extends Mapper {        @Override        public void map(Object key, Text value,        Context context) throws IOException, InterruptedException {                // 用来实现wordcount功能,示例程序, Mapper                /*{                         IntWritable one = new IntWritable(1);                        Text word = new Text();                                                StringTokenizer token = new StringTokenizer(value.toString());                        while (token.hasMoreTokens()) {                                word.set(token.nextToken());                                context.write(word, one);                        }                }*/                                // 将多列数据写入hbase, Mapper                {                        String [] temps = value.toString().split("\t");                        if (null != temps && temps.length == 8) {                                Text word = new Text();                                word.set(temps[1]);                                context.write(word, value);                        }                }        }}

Reducer端代码:

package com.invic.mapreduce.hbase.target;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;/** *  * @author lucl * */public class MyReducer extends TableReducer {        @Override        public void reduce(Text key, Iterable value, Context context) throws IOException, InterruptedException {                // for wordcount                 // TableReducer                // Iterable                /*{                        int sum = 0;                        for (Iterator it = value.iterator(); it.hasNext(); ) {                                IntWritableval = it.next();                                sum += val.get();                        }                                                Put put = new Put(key.getBytes());                        // sum为Integer类型,需要先转为S他ring,然后再取byte值,否则查看数据时无法显示sum的值                        byte [] datas = Bytes.toBytes(String.valueOf(sum));                         put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), datas);                        context.write(new ImmutableBytesWritable(key.getBytes()), put);                }*/                                // 需要将多列写入HBase                // TableReducer                // Iterable value                {                        byte [] family = "cf".getBytes();                                                Put put = new Put(key.getBytes());                                                StringBuffer sbf = new StringBuffer();                        for (Text text : value) {                                sbf.append(text.toString());                        }                                                put.addColumn(family, Bytes.toBytes("detail"), Bytes.toBytes(sbf.toString()));                                                context.write(new ImmutableBytesWritable(key.getBytes()), put);                }        }}

Driver驱动类:

package com.invic.mapreduce.hbase.target;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.HColumnDescriptor;import org.apache.hadoop.hbase.HTableDescriptor;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Admin;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** *  * @author lucl * HBase作为输出源示例 * */public class HBaseASDataTargetDriver extends Configured implements Tool {        private static final String TABLE_NAME = "t_inter_log";        private static final String COLUMN_FAMILY_NAME = "cf";                /**         *          * @param args         * @throws Exception          */        public static void main(String[] args) throws Exception {                // for eclipse                // System.setProperty("hadoop.home.dir", "E:\\hadoop-2.6.0\\hadoop-2.6.0\\");                                int exit = ToolRunner.run(new HBaseASDataTargetDriver(), args);                System.out.println("receive exit : " + exit);        }        @Override        public int run(String[] args) throws Exception {                Configuration conf = HBaseConfiguration.create(getConf());                // hadoop的参数配置                conf.set("fs.defaultFS", "hdfs://cluster");                conf.set("dfs.nameservices", "cluster");                conf.set("dfs.ha.namenodes.cluster", "nn1,nn2");                conf.set("dfs.namenode.rpc-address.cluster.nn1", "nnode:8020");                conf.set("dfs.namenode.rpc-address.cluster.nn2", "dnode1:8020");                conf.set("dfs.client.failover.proxy.provider.cluster",                                 "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");                // hbase master                // property "hbase.master" has been deprecated since 0.90                // Just passing the ZK configuration makes your client auto-discover the master                // conf.set("hbase.master", "nnode:60000");                // zookeeper quorum                conf.set("hbase.zookeeper.property.clientport", "2181");                conf.set("hbase.zookeeper.quorum", "nnode,dnode1,dnode2");                // 是否对Map Task启用推测执行机制                conf.setBoolean("mapreduce.map.speculative", false);                // 是否对Reduce Task启用推测执行机制                conf.setBoolean("mapreduce.reduce.speculative", false);                                /**                 * HBase创建表                 */                Connection connection = ConnectionFactory.createConnection(conf);                Admin admin = connection.getAdmin();                TableName tableName = TableName.valueOf(TABLE_NAME);                                boolean exists = admin.tableExists(tableName);                if (exists) {                        admin.disableTable(tableName);                        admin.deleteTable(tableName);                }                                HTableDescriptor tableDesc = new HTableDescriptor(tableName);                HColumnDescriptor columnDesc = new HColumnDescriptor(COLUMN_FAMILY_NAME);                tableDesc.addFamily(columnDesc);                                admin.createTable(tableDesc);                                /**                 * 读取文件内容                 */                String fileName = "http_interceptor_20130913.txt";                                Job job = Job.getInstance(conf);                job.setJobName("MyBaseWriterToHBase");                job.setJarByClass(HBaseASDataTargetDriver.class);                                job.setMapperClass(MyMapper.class);                                /**                 * MapReduce读取文本文件时默认的的四个参数(KeyIn, ValueIn,KeyOut,ValueOut)                 * 说明:                 *            默认情况下KeyIn为IntWrite类型,为在文本文件中的偏移量,ValueIn为一行数据                 *    第一次测试时未设置的设置map端输出的key-value类型,程序执行正常                 *  第二次增加map端输出的key-value类型设置                 *       job.setMapOutputKeyClass                 *       job.setMapOutputValueClass                 *       Hadoop应用开发技术详解2015年1月第1版P191页写的:                 *               map端输出的key-value默认类型分别为LongWritable和Text                 *               根据示例程序MyMapper中实现的map端输出的key-value实际为Text和IntWritable                 *                    // job.setMapOutputKeyClass(LongWritable.class);                // job.setMapOutputValueClass(Text.class);                // 设置后页面调用时报错如下:                15/09/04 22:19:06 INFO mapreduce.Job: Task Id : attempt_1441346242717_0014_m_000000_0, Status : FAILED                Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, received org.apache.hadoop.io.Text        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1069)        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712)        at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)        at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:21)        at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:1)        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)        at java.security.AccessController.doPrivileged(Native Method)        at javax.security.auth.Subject.doAs(Subject.java:415)        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)                        第三次设置为与Mapper类中一致的,程序执行正确。                */                job.setMapOutputKeyClass(Text.class);                job.setMapOutputValueClass(IntWritable.class);                                // 下面这句话不能加,在测试中发现加了这句话竟然报错找不到MyReducer类了。                // job.setReducerClass(MyReducer.class);                                Path path = new Path(fileName);                FileInputFormat.addInputPath(job, path);                                TableMapReduceUtil.initTableReducerJob(TABLE_NAME, MyReducer.class, job);                                // for wordcount                // job.setOutputKeyClass(Text.class);                // job.setOutputValueClass(IntWritable.class);                                // for multi columns                job.setOutputKeyClass(Text.class);                job.setOutputValueClass(Text.class);                                return job.waitForCompletion(true) ? 0 : 1;        }        }

未设置Map输出的key-value的类型时报错如下(wordcount的示例未报错,在Hadoop应用开发技术详解中说map端输出的key-value默认类型为:LongWritable.class和Text.class,但是wordcount示例中map端输出的key-value类型却为Text.class和IntWritable):

15/09/04 21:15:54 INFO mapreduce.Job:  map 0% reduce 0/09/04 21:16:27 INFO mapreduce.Job: Task Id : attempt_1441346242717_0011_m_000000_0, Status : FAILEDError: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.IntWritable, received org.apache.hadoop.io.Text        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1074)        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712)        at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)        at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:29)        at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:1)        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)        at java.security.AccessController.doPrivileged(Native Method)        at javax.security.auth.Subject.doAs(Subject.java:415)        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)        # 由于出现错误时Map端为0%,所以分析问题出现在map端,且根据提示信息说明默认value应该是IntWritable,我第二次的示例与wordcount的差别主要在map端输出的value由IntWritabe->Text,设置了如下参数后问题解决。# job.setMapOutputKeyClass(Text.class);# job.setMapOutputValueClass(Text.class);

wordcount及数据入库示例程序执行结果验证:

hbase(main):005:0> scan 't_inter_log'ROW                                      COLUMN+CELL                    14100                                     column=cf:count, timestamp=1441370812728, value=1 16:04:08                                 column=cf:count, timestamp=1441370812728, value=4 18863:08                                 column=cf:count, timestamp=1441370812728, value=1 192.168.1.130                             column=cf:count, timestamp=1441370812728, value=1 192.168.1.139                             column=cf:count, timestamp=1441370812728, value=1 192.168.1.154                             column=cf:count, timestamp=1441370812728, value=1 192.168.1.159                     column=cf:count, timestamp=1441370812728, value=1 192.168.1.759                     column=cf:count, timestamp=1441370812728, value=4 2013-09-13759                     column=cf:count, timestamp=1441370812728, value=4 3904409-13759                          column=cf:count, timestamp=1441370812728, value=1 4927409-13759                          column=cf:count, timestamp=1441370812728, value=1 8027409-13759                             column=cf:count, timestamp=1441370812728, value=4 HTTP409-13759                             column=cf:count, timestamp=1441370812728, value=4 www.subnetc1.com                       column=cf:count, timestamp=1441370812728, value=1 www.subnetc1.com/index.html  column=cf:count, timestamp=1441370812728, value=1 www.subnetc2.com/index.html  column=cf:count, timestamp=1441370812728, value=1 www.subnetc3.com/index.html  column=cf:count, timestamp=1441370812728, value=1 www.subnetc4.com/index.html  column=cf:count, timestamp=1441370812728, value=1 18 row(s) in 1.2290 seconds # 每次执行时都会先删除t_inter_log表hbase(main):007:0> scan 't_inter_log'ROW            COLUMN+CELLwww.subnetc1.com     column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc1.com\x09192.168.1.7\x0980\x09192.168.1.139\x0918863\x09HTTP\x09www.subnetc1.com/index.html www.subnetc2.com    column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc2.com\x09192.168.1.7\x0980\x09192.168.1.159\x0914100\x09HTTP\x09www.subnetc2.com/index.html www.subnetc3.com    column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc3.com\x09192.168.1.7\x0980\x09192.168.1.130\x094927\x09HTTP\x09www.subnetc3.com/index.html www.subnetc4.com    column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc4.com\x09192.168.1.7\x0980\x09192.168.1.154\x0939044\x09HTTP\x09www.subnetc4.com/index.html4 row(s) in 3.3280 seconds

6、HBase作为共享源示例


0