千家信息网

Storm-Hbase接口怎么用

发表于:2025-01-28 作者:千家信息网编辑
千家信息网最后更新 2025年01月28日,这篇文章主要介绍Storm-Hbase接口怎么用,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!package storm.contrib.hbase.bolts;import
千家信息网最后更新 2025年01月28日Storm-Hbase接口怎么用

这篇文章主要介绍Storm-Hbase接口怎么用,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

package storm.contrib.hbase.bolts;import static backtype.storm.utils.Utils.tuple;import java.util.Map;import org.apache.hadoop.hbase.HBaseConfiguration;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import storm.contrib.hbase.utils.HBaseCommunicator;import storm.contrib.hbase.utils.HBaseConnector;/*   一个读取Hbase的Bolt,不断的从Hbase中读取表中的行KEY,和列,通过tuples来发送 * Reads the specified column of HBase table and emits the row key and the column values in the form of tuples */public class HBaseColumnValueLookUpBolt implements IBasicBolt {                private static final long serialVersionUID = 1L;                private String tableName = null, colFamilyName = null, colName = null, rowKeyField = null, columnValue = null;                private static transient HBaseConnector connector = null;        private static transient HBaseConfiguration conf = null;        private static transient HBaseCommunicator communicator = null;        OutputCollector _collector;        /*         * Constructor initializes the variables storing the hbase table information and connects to hbase         */        public HBaseColumnValueLookUpBolt(final String hbaseXmlLocation, final String rowKeyField, final String tableName, final String colFamilyName, final String colName) {                this.tableName = tableName;                this.colFamilyName = colFamilyName;                this.colName = colName;                this.rowKeyField = rowKeyField;                connector = new HBaseConnector();                conf = connector.getHBaseConf(hbaseXmlLocation);                communicator = new HBaseCommunicator(conf);        }        /*         * emits the value of the column with name @colName and rowkey @rowKey         * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)         */        public void execute(Tuple input, BasicOutputCollector collector) {                String rowKey = input.getStringByField(this.rowKeyField);                columnValue = communicator.getColEntry(this.tableName, rowKey, this.colFamilyName, this.colName);                collector.emit(tuple(rowKey, columnValue));        }        public void prepare(Map confMap, TopologyContext context,                        OutputCollector collector) {                _collector = collector;        }        public void cleanup() {        }        public void declareOutputFields(OutputFieldsDeclarer declarer) {                declarer.declare(new Fields("rowKey", "columnValue"));        }        public Map getComponentConfiguration() {                Map map = null;                return map;        }        public void prepare(Map stormConf, TopologyContext context) {        }}
package storm.contrib.hbase.bolts;import static backtype.storm.utils.Utils.tuple;import java.util.Map;import org.apache.hadoop.hbase.HBaseConfiguration;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import storm.contrib.hbase.utils.HBaseCommunicator;import storm.contrib.hbase.utils.HBaseConnector;/* * Reads the specified column of HBase table and emits the row key and the column values in the form of tuples */public class HBaseColumnValueLookUpBolt implements IBasicBolt {                private static final long serialVersionUID = 1L;                private String tableName = null, colFamilyName = null, colName = null, rowKeyField = null, columnValue = null;                private static transient HBaseConnector connector = null;        private static transient HBaseConfiguration conf = null;        private static transient HBaseCommunicator communicator = null;        OutputCollector _collector;        /*         * Constructor initializes the variables storing the hbase table information and connects to hbase         */        public HBaseColumnValueLookUpBolt(final String hbaseXmlLocation, final String rowKeyField, final String tableName, final String colFamilyName, final String colName) {                this.tableName = tableName;                this.colFamilyName = colFamilyName;                this.colName = colName;                this.rowKeyField = rowKeyField;                connector = new HBaseConnector();                conf = connector.getHBaseConf(hbaseXmlLocation);                communicator = new HBaseCommunicator(conf);        }        /*         * emits the value of the column with name @colName and rowkey @rowKey         * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)         */        public void execute(Tuple input, BasicOutputCollector collector) {                String rowKey = input.getStringByField(this.rowKeyField);                        //通过指定我们的 表名,行键,列族,列名,直接通过communitor拿到列的值。                columnValue = communicator.getColEntry(this.tableName, rowKey, this.colFamilyName, this.colName);                collector.emit(tuple(rowKey, columnValue));        }        public void prepare(Map confMap, TopologyContext context,                        OutputCollector collector) {                _collector = collector;        }        public void cleanup() {        }        public void declareOutputFields(OutputFieldsDeclarer declarer) {                declarer.declare(new Fields("rowKey", "columnValue"));        }        public Map getComponentConfiguration() {                Map map = null;                return map;        }        public void prepare(Map stormConf, TopologyContext context) {        }}

Rowkey

package storm.contrib.hbase.spouts;import backtype.storm.topology.OutputFieldsDeclarer;import java.util.Map;import java.util.UUID;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;import java.util.Random;import org.apache.log4j.Logger;/*这个Spout主要是用来发射 Hbase的RowKey,rowkey的集合为自己设置的。 * Spout emitting tuples containing the rowkey of the hbase table */public class RowKeyEmitterSpout implements IRichSpout {            private static final long serialVersionUID = 6814162766489261607L;        public static Logger LOG = Logger.getLogger(RowKeyEmitterSpout.class);    boolean _isDistributed;    SpoutOutputCollector _collector;    public RowKeyEmitterSpout() {        this(true);    }    public RowKeyEmitterSpout(boolean isDistributed) {        _isDistributed = isDistributed;    }        public boolean isDistributed() {        return _isDistributed;    }        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {        _collector = collector;    }        public void close() {    }            public void nextTuple() {        Utils.sleep(100);        Thread.yield();        final String[] words = new String[] {"rowKey1", "rowKey2", "rowKey3", "rowKey4"};        final Random rand = new Random();        final String word = words[rand.nextInt(words.length)];        _collector.emit(new Values(word), UUID.randomUUID());    }        public void ack(Object msgId) {    }    public void fail(Object msgId) {    }        public void declareOutputFields(OutputFieldsDeclarer declarer) {        declarer.declare(new Fields("word"));    }        public void activate() {        }        public void deactivate() {        }        public Map getComponentConfiguration() {                return null;        }}

// 我们用来简单的测试系统的代码,测试接口是否正确

package storm.contrib.hbase.spouts;import java.util.Map;import java.util.Random;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;public class TestSpout implements IRichSpout {        SpoutOutputCollector _collector;        Random _rand;          int count = 0;        public boolean isDistributed() {                return true;        }        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {                _collector = collector;                _rand = new Random();        }        public void nextTuple() {                Utils.sleep(1000);                String[] words = new String[] { "hello", "tiwari", "indore", "jayati"};                Integer[] numbers = new Integer[] {                                1,2,3,4,5                };                if(count == numbers.length -1) {                        count = 0;                }                count ++;                int number = numbers[count];                String word = words[count];                int randomNum = (int) (Math.random()*1000);                _collector.emit(new Values(word, number));        }        public void close() {                }        public void ack(Object id) {        }        public void fail(Object id) {        }        public void declareOutputFields(OutputFieldsDeclarer declarer) {                declarer.declare(new Fields("word", "number"));        }        public void activate() {        }        public void deactivate() {        }        public Map getComponentConfiguration() {                return null;        }}

比较简单,也就不做解释了,Storm-hbase的接口并没有像Storm-kafka的接口那样,自身去处理轮询,自身去处理连接的问题。只是简单的构造了一个Hbase的连接,在连接的过程之中,直接构造了一个Connector就可以了。

以上是"Storm-Hbase接口怎么用"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!

接口 内容 篇文章 处理 测试 不断 之中 代码 价值 兴趣 只是 小伙 小伙伴 更多 测试系统 知识 系统 行业 资讯 资讯频道 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 软件开发简历的项目经验 二级网络安全教育 哪种品牌的软件开发外包 深圳系统软件开发价位 通信网络安全服务能力评定通知 软件开发是不是刚需 pg数据库添加只读权限账户 如果要修改数据库中存储的数据 林业数据库林地保护等级 免备案的云服务器 sql 条件排除数据库 国家电脑网络安全安徽中心 什么是网络安全的英语怎么说 华为服务器二手的可以卖多少 囊谦县网络安全宣传周短视频 软件开发类工程管理制度 网络技术的由来 怎么修改oracle数据库名称 数据库原理关系运算的定义 网络技术出来是干什么的 电力电子软件开发工程师 软件开发环境 实验室 考验数据库专业课 直男聊天软件开发 阿里巴巴手机云服务器下载 自研数据库设备 数据库使用查询思维导图 网络安全可编程硬件平台 湖北移动校招网络技术岗笔试 安徽信息软件开发商家
0