千家信息网

MapReduce on Hbase

发表于:2025-01-20 作者:千家信息网编辑
千家信息网最后更新 2025年01月20日,org.apache.hadoop.hbase.mapreduceTableMapper TableReducer一个region对应一个mapimport java.io.IOException;i
千家信息网最后更新 2025年01月20日MapReduce on Hbase


org.apache.hadoop.hbase.mapreduce


TableMapper TableReducer


一个region对应一个map

import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Mutation;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Reducer;public class HbaseMR {    public class MyMapper extends TableMapper {        @Override        protected void map(ImmutableBytesWritable key, Result value,                Context context) throws IOException, InterruptedException {            // key代表rowkey            Text k = new Text(Bytes.toString(key.get()));            Text v = new Text(Bytes.toString(value.getValue(                    "basicinfo".getBytes(), "age".getBytes())));            context.write(v, k);        }    }    public class MyReducer extends TableReducer {        @Override        protected void reduce(Text key, Iterable values, Context context)                throws IOException, InterruptedException {            Put put = new Put(Bytes.toBytes(key.toString()));            for (Text value : values) {                put.add(Bytes.toBytes("f1"), Bytes.toBytes(value.toString()),                        Bytes.toBytes(value.toString()));            }            context.write(null, put);        }    }    public static void main(String[] args) {        Configuration conf=    HBaseConfiguration.create();        try {            Job job=new Job(conf, "mapreduce on hbase");            job.setJarByClass(HbaseMR.class);            Scan scan=new Scan();            scan.setCaching(1000);//            TableMapReduceUtil.initTableMapperJob("students", scan, MyMapper.class, Text.class, Text.class, job);            TableMapReduceUtil.initTableReducerJob("student-age",  MyReducer.class,  job);            job.waitForCompletion(true);        } catch (Exception e) {                        e.printStackTrace();        }    }}


0