千家信息网

HBase And MapReduce举例分析

发表于:2024-10-26 作者:千家信息网编辑
千家信息网最后更新 2024年10月26日,这篇文章主要介绍"HBase And MapReduce举例分析",在日常操作中,相信很多人在HBase And MapReduce举例分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法
千家信息网最后更新 2024年10月26日HBase And MapReduce举例分析

这篇文章主要介绍"HBase And MapReduce举例分析",在日常操作中,相信很多人在HBase And MapReduce举例分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"HBase And MapReduce举例分析"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

在HDFS某目录文件下有多个文件内容,将这些多个文件内容中的数据通过倒排索引后将结果写入到HBase某张表中,代码如下:

1.InvertedIndexMapper

public class InvertedIndexMapper extends Mapper{                private Text keyInfo = new Text();  // 存储单词和URI的组合        private Text valueInfo = new Text(); //存储词频        private FileSplit split;  // 存储split对象。                @Override        protected void map(Object key, Text value, Mapper.Context context)                        throws IOException, InterruptedException {                                System.out.println(  "key-->: " +key + "\n value --> : "+value );                //获得对所属的FileSplit对象。                split = (FileSplit) context.getInputSplit();                System.out.println(  "split---> "+split.toString() );                //System.out.println("value.tostring()---> "+  value.toString() );                StringTokenizer itr = new StringTokenizer( value.toString());                                while( itr.hasMoreTokens() ){                        // key值由单词和URI组成。                        keyInfo.set( itr.nextToken()+":"+split.getPath().toString());                        //System.out.println("split.getPath().toString() --> "+  split.getPath().toString() );                        //词频初始为1                        valueInfo.set("1");                        context.write(keyInfo, valueInfo);                }        }}

2.InvertedIndexCombiner

public class InvertedIndexCombiner extends Reducer{                private Text info = new Text();        @Override        protected void reduce(Text key, Iterable values, Reducer.Context context)                        throws IOException, InterruptedException {                //统计词频                int sum = 0;                for (Text value : values) {                        sum += Integer.parseInt(value.toString() );                }                int splitIndex = key.toString().indexOf(":");                //重新设置value值由URI和词频组成                info.set( key.toString().substring( splitIndex + 1) +":"+sum );                        //重新设置key值为单词                key.set( key.toString().substring(0,splitIndex));                context.write(key, info);        }}

3.InvertedIndexReducer

public class InvertedIndexReducer extends Reducer{                private Text result = new Text();        @Override        protected void reduce(Text key, Iterable values, Reducer.Context context)                        throws IOException, InterruptedException {                                //生成文档列表                String fileList = new String();                for (Text value : values) {                        fileList += value.toString()+";";                }                result.set(fileList);                context.write(key, result);        }}

4.HBaseAndInvertedIndex

public class HBaseAndInvertedIndex {                private static Path outPath;                public static void main(String[] args) throws Exception {                run();                System.out.println( "\n\n************************");                runHBase();        }        public static void run() throws Exception {                Configuration conf = new Configuration();                Job job = Job.getInstance(conf,"Hadoop-InvertedIndex");                                job.setJarByClass(HBaseAndInvertedIndex.class);                                //实现map函数,根据输入的对生成中间结果。                job.setMapperClass(InvertedIndexMapper.class);                                job.setMapOutputKeyClass(Text.class);                job.setMapOutputValueClass(Text.class);                job.setCombinerClass(InvertedIndexCombiner.class);                job.setReducerClass(InvertedIndexReducer.class);                job.setOutputKeyClass(Text.class);                job.setOutputValueClass(Text.class);                FileInputFormat.addInputPath(job, new Path("hdfs://192.168.226.129:9000/txt/invertedindex/"));                DateFormat df = new SimpleDateFormat( "yyyyMMddHHmmssS" );                String filename = df.format( new Date() );                outPath = new Path("hdfs://192.168.226.129:9000/rootdir/invertedindexhbase/result/"+filename+"/");                FileOutputFormat.setOutputPath(job, outPath);                  int result = job.waitForCompletion(true) ? 0 : 1;                        }                public static void runHBase() throws Exception {                Configuration conf = new Configuration();                conf = HBaseConfiguration.create(conf);                conf.set("hbase.zookeeper.quorum", "192.168.226.129");                Job job = Job.getInstance(conf, "HBase-InvertedIndex");                job.setJarByClass(HBaseAndInvertedIndex.class);                job.setInputFormatClass(KeyValueTextInputFormat.class);                                job.setMapOutputKeyClass(Text.class);                job.setMapOutputValueClass(Text.class);                                // 把数据写入Hbase数据库                FileInputFormat.addInputPath(job, new Path(outPath.toString()+"/part-r-00000") );                System.out.println( "path---> "+ outPath.toString()  );                TableMapReduceUtil.initTableReducerJob("invertedindex",InvertedIndexHBaseReducer.class, job);                //将数据写入HBase数据库                //首先先检查表是否存在                checkTable(conf);                System.exit( job.waitForCompletion(true) ? 0 : 1 );                        }        private static void checkTable(Configuration conf) throws Exception {                Connection con = ConnectionFactory.createConnection(conf);                Admin admin = con.getAdmin();                TableName tn = TableName.valueOf("invertedindex");                if (!admin.tableExists(tn)){                        HTableDescriptor htd = new HTableDescriptor(tn);                        HColumnDescriptor hcd = new HColumnDescriptor("indexKey");                        htd.addFamily(hcd);                        admin.createTable(htd);                        System.out.println("表不存在,新创建表成功....");                }        }        /**         * 1. 因为map是从hdfs中取数据,因此没有太大变化;而reduce需要输出结果到hbase中,         *            所以这里继承了TableReduce,这里没有valueout,         *                    但是规定TableReduce的valueout必须是Put或者Delete实例         *          * 2.ImmutableBytesWritable:它是一个可以用作key或value类型的字节序列,         * */        public static class InvertedIndexHBaseReducer extends TableReducer {                @Override                protected void reduce(                                Text key,                                Iterable values,                                Reducer.Context context)                                                throws IOException, InterruptedException {                        System.out.println(  "key---> " + key.toString()  );                        //注意行健参数的书写。                        Put put = new Put(key.toString().getBytes());                        put.addColumn(Bytes.toBytes( "indexKey" ), Bytes.toBytes("indexUrlWeight"),values.iterator().next().getBytes());                        context.write(new ImmutableBytesWritable(key.getBytes()), put);                }        }}

///原数据目录文件:

invertedindex1.txt

Hello I will Learning HadoopHDFS MapReduceOther I will Learning HBase

invertedindex2.txt :

Hello HBaseMapReduce HDFS

查看结果:scan:

hbase(main):002:0> scan 'invertedindex'ROW                             COLUMN+CELL                                                                               HBase                          column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900                                0/txt/invertedindex/invertedindex2.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in                                vertedindex1.txt:1;                                                                       HDFS                           column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900                                0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in                                vertedindex2.txt:1;                                                                       Hadoop                         column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900                                0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in                                vertedindex2.txt:1;                                                                       Hello                          column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900                                0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in                                vertedindex2.txt:1;                                                                       I                              column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900                                0/txt/invertedindex/invertedindex1.txt:2;hdfs://192.168.226.129:9000/txt/invertedindex/in                                vertedindex2.txt:1;                                                                       Learning                       column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900                                0/txt/invertedindex/invertedindex1.txt:2;hdfs://192.168.226.129:9000/txt/invertedindex/in                                vertedindex2.txt:1;                                                                       MapReduce                      column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900                                0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in                                vertedindex2.txt:1;                                                                       Other                          column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900                                0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in                                vertedindex2.txt:1;                                                                       will                           column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900                                0/txt/invertedindex/invertedindex1.txt:2;hdfs://192.168.226.129:9000/txt/invertedindex/in                                vertedindex2.txt:1;                                                                      9 row(s) in 0.2240 seconds

到此,关于"HBase And MapReduce举例分析"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0