千家信息网

如何实现一个MapReduce读取数据存入HBase

发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,这篇文章给大家介绍如何实现一个MapReduce读取数据存入HBase,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。车辆位置数据文件,格式:车辆id 速度:油耗:当前里程。通过M
千家信息网最后更新 2025年02月04日如何实现一个MapReduce读取数据存入HBase

这篇文章给大家介绍如何实现一个MapReduce读取数据存入HBase,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

车辆位置数据文件,格式:车辆id 速度:油耗:当前里程。

通过MapReduce算出每辆车的平均速度、油耗、里程

vid1 78:8:120vid1 56:11:124vid1 98:5:130vid1 72:6:131vid2 78:4:281vid2 58:9:298vid2 67:15:309

创建Map类和map函数

import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class VehicleMapper extends Mapper {        @Override        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {                String vehicle = value.toString();// 将输入的纯文本的数据转换成String                // 将输入的数据先按行进行分割                StringTokenizer tokenizerArticle = new StringTokenizer(vehicle, "\n");                // 分别对每一行进行处理                while (tokenizerArticle.hasMoreTokens()) {                        // 每行按空格划分                        StringTokenizer tokenizer = new StringTokenizer(tokenizerArticle.nextToken());                        String vehicleId = tokenizer.nextToken(); // vid                        String vehicleInfo = tokenizer.nextToken(); // 车辆信息                        Text vid = new Text(vehicleId);                        Text info = new Text(vehicleInfo);                        context.write(vid, info);                }        }}

创建Reduce类

import java.io.IOException;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;public class VehicleReduce extends TableReducer {        @Override        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {                int speed = 0;                int oil = 0;                int mile = 0;                int count = 0;                for (Text val : values) {                        String str = val.toString();                        String[] arr = str.split(":");                        speed += Integer.valueOf(arr[0]);                        oil += Integer.valueOf(arr[1]);                        mile += Integer.valueOf(arr[2]) - mile; // 累积里程                        count++;                }                speed = (int) speed / count; // 求平均值                oil = (int) oil / count;                mile = (int) mile / count;                String result = speed + ":" + oil + ":" + mile;                Put put = new Put(key.getBytes());                put.add(Bytes.toBytes("info"), Bytes.toBytes("property"), Bytes.toBytes(result));                ImmutableBytesWritable keys = new ImmutableBytesWritable(key.getBytes());                context.write(keys, put);        }}

运行任务

import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class VehicleMapReduceJob {        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {                Configuration conf = new Configuration();                conf = HBaseConfiguration.create(conf);                Job job = new Job(conf, "HBase_VehicleInfo");                job.setJarByClass(VehicleMapReduceJob.class);                job.setMapperClass(VehicleMapper.class);                job.setMapOutputKeyClass(Text.class);                job.setMapOutputValueClass(Text.class);                FileInputFormat.addInputPath(job, new Path(args[0])); // 设置输入文件路径                TableMapReduceUtil.initTableReducerJob("vehicle", VehicleReduce.class, job);                System.exit(job.waitForCompletion(true) ? 0 : 1);        }}

将代码导出成vehicle.jar,放在hadoop-1.2.1目录下,输入命令

./bin/hadoop jar vehicle.jar com/xh/vehicle/VehicleMapReduceJob input/vehicle.txt

HBase结果查询:

关于如何实现一个MapReduce读取数据存入HBase就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

0