千家信息网

Storm-Hbase接口怎么用

发表于:2024-11-29 作者:千家信息网编辑
千家信息网最后更新 2024年11月29日,这篇文章主要介绍Storm-Hbase接口怎么用,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!package storm.contrib.hbase.bolts;import
千家信息网最后更新 2024年11月29日Storm-Hbase接口怎么用

这篇文章主要介绍Storm-Hbase接口怎么用,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

package storm.contrib.hbase.bolts;import static backtype.storm.utils.Utils.tuple;import java.util.Map;import org.apache.hadoop.hbase.HBaseConfiguration;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import storm.contrib.hbase.utils.HBaseCommunicator;import storm.contrib.hbase.utils.HBaseConnector;/*   一个读取Hbase的Bolt,不断的从Hbase中读取表中的行KEY,和列,通过tuples来发送 * Reads the specified column of HBase table and emits the row key and the column values in the form of tuples */public class HBaseColumnValueLookUpBolt implements IBasicBolt {                private static final long serialVersionUID = 1L;                private String tableName = null, colFamilyName = null, colName = null, rowKeyField = null, columnValue = null;                private static transient HBaseConnector connector = null;        private static transient HBaseConfiguration conf = null;        private static transient HBaseCommunicator communicator = null;        OutputCollector _collector;        /*         * Constructor initializes the variables storing the hbase table information and connects to hbase         */        public HBaseColumnValueLookUpBolt(final String hbaseXmlLocation, final String rowKeyField, final String tableName, final String colFamilyName, final String colName) {                this.tableName = tableName;                this.colFamilyName = colFamilyName;                this.colName = colName;                this.rowKeyField = rowKeyField;                connector = new HBaseConnector();                conf = connector.getHBaseConf(hbaseXmlLocation);                communicator = new HBaseCommunicator(conf);        }        /*         * emits the value of the column with name @colName and rowkey @rowKey         * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)         */        public void execute(Tuple input, BasicOutputCollector collector) {                String rowKey = input.getStringByField(this.rowKeyField);                columnValue = communicator.getColEntry(this.tableName, rowKey, this.colFamilyName, this.colName);                collector.emit(tuple(rowKey, columnValue));        }        public void prepare(Map confMap, TopologyContext context,                        OutputCollector collector) {                _collector = collector;        }        public void cleanup() {        }        public void declareOutputFields(OutputFieldsDeclarer declarer) {                declarer.declare(new Fields("rowKey", "columnValue"));        }        public Map getComponentConfiguration() {                Map map = null;                return map;        }        public void prepare(Map stormConf, TopologyContext context) {        }}
package storm.contrib.hbase.bolts;import static backtype.storm.utils.Utils.tuple;import java.util.Map;import org.apache.hadoop.hbase.HBaseConfiguration;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import storm.contrib.hbase.utils.HBaseCommunicator;import storm.contrib.hbase.utils.HBaseConnector;/* * Reads the specified column of HBase table and emits the row key and the column values in the form of tuples */public class HBaseColumnValueLookUpBolt implements IBasicBolt {                private static final long serialVersionUID = 1L;                private String tableName = null, colFamilyName = null, colName = null, rowKeyField = null, columnValue = null;                private static transient HBaseConnector connector = null;        private static transient HBaseConfiguration conf = null;        private static transient HBaseCommunicator communicator = null;        OutputCollector _collector;        /*         * Constructor initializes the variables storing the hbase table information and connects to hbase         */        public HBaseColumnValueLookUpBolt(final String hbaseXmlLocation, final String rowKeyField, final String tableName, final String colFamilyName, final String colName) {                this.tableName = tableName;                this.colFamilyName = colFamilyName;                this.colName = colName;                this.rowKeyField = rowKeyField;                connector = new HBaseConnector();                conf = connector.getHBaseConf(hbaseXmlLocation);                communicator = new HBaseCommunicator(conf);        }        /*         * emits the value of the column with name @colName and rowkey @rowKey         * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)         */        public void execute(Tuple input, BasicOutputCollector collector) {                String rowKey = input.getStringByField(this.rowKeyField);                        //通过指定我们的 表名,行键,列族,列名,直接通过communitor拿到列的值。                columnValue = communicator.getColEntry(this.tableName, rowKey, this.colFamilyName, this.colName);                collector.emit(tuple(rowKey, columnValue));        }        public void prepare(Map confMap, TopologyContext context,                        OutputCollector collector) {                _collector = collector;        }        public void cleanup() {        }        public void declareOutputFields(OutputFieldsDeclarer declarer) {                declarer.declare(new Fields("rowKey", "columnValue"));        }        public Map getComponentConfiguration() {                Map map = null;                return map;        }        public void prepare(Map stormConf, TopologyContext context) {        }}

Rowkey

package storm.contrib.hbase.spouts;import backtype.storm.topology.OutputFieldsDeclarer;import java.util.Map;import java.util.UUID;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;import java.util.Random;import org.apache.log4j.Logger;/*这个Spout主要是用来发射 Hbase的RowKey,rowkey的集合为自己设置的。 * Spout emitting tuples containing the rowkey of the hbase table */public class RowKeyEmitterSpout implements IRichSpout {            private static final long serialVersionUID = 6814162766489261607L;        public static Logger LOG = Logger.getLogger(RowKeyEmitterSpout.class);    boolean _isDistributed;    SpoutOutputCollector _collector;    public RowKeyEmitterSpout() {        this(true);    }    public RowKeyEmitterSpout(boolean isDistributed) {        _isDistributed = isDistributed;    }        public boolean isDistributed() {        return _isDistributed;    }        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {        _collector = collector;    }        public void close() {    }            public void nextTuple() {        Utils.sleep(100);        Thread.yield();        final String[] words = new String[] {"rowKey1", "rowKey2", "rowKey3", "rowKey4"};        final Random rand = new Random();        final String word = words[rand.nextInt(words.length)];        _collector.emit(new Values(word), UUID.randomUUID());    }        public void ack(Object msgId) {    }    public void fail(Object msgId) {    }        public void declareOutputFields(OutputFieldsDeclarer declarer) {        declarer.declare(new Fields("word"));    }        public void activate() {        }        public void deactivate() {        }        public Map getComponentConfiguration() {                return null;        }}

// 我们用来简单的测试系统的代码,测试接口是否正确

package storm.contrib.hbase.spouts;import java.util.Map;import java.util.Random;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;public class TestSpout implements IRichSpout {        SpoutOutputCollector _collector;        Random _rand;          int count = 0;        public boolean isDistributed() {                return true;        }        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {                _collector = collector;                _rand = new Random();        }        public void nextTuple() {                Utils.sleep(1000);                String[] words = new String[] { "hello", "tiwari", "indore", "jayati"};                Integer[] numbers = new Integer[] {                                1,2,3,4,5                };                if(count == numbers.length -1) {                        count = 0;                }                count ++;                int number = numbers[count];                String word = words[count];                int randomNum = (int) (Math.random()*1000);                _collector.emit(new Values(word, number));        }        public void close() {                }        public void ack(Object id) {        }        public void fail(Object id) {        }        public void declareOutputFields(OutputFieldsDeclarer declarer) {                declarer.declare(new Fields("word", "number"));        }        public void activate() {        }        public void deactivate() {        }        public Map getComponentConfiguration() {                return null;        }}

比较简单,也就不做解释了,Storm-hbase的接口并没有像Storm-kafka的接口那样,自身去处理轮询,自身去处理连接的问题。只是简单的构造了一个Hbase的连接,在连接的过程之中,直接构造了一个Connector就可以了。

以上是"Storm-Hbase接口怎么用"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!

接口 内容 篇文章 处理 测试 不断 之中 代码 价值 兴趣 只是 小伙 小伙伴 更多 测试系统 知识 系统 行业 资讯 资讯频道 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 数据库用户名不存在 对于服务器厂商的认识 青少年网络安全意识引导 空间数据库建立案例 数据库表分批次导出 上海办公系统软件开发有用吗 网络安全 四种能力 超级计算机网络安全 反恐精英服务器维护 互联网数据中心排名服务器台数 cf哪个区服务器优化 英雄联盟手游各个服务器兑换比例 龙口定制软件开发公司有哪些 苏州通信网络技术包括什么 为何家庭网络安全类型不安全 忍者风暴最强服务器 有软件开发经验的个人简历 怎么检查数据库是否加了索引 柏云服务器上传歌曲 网络安全优化社区服务 浙江省信息通信行业网络安全竞赛 基于事故致因理论的网络安全 计量数据库 集成平台服务器 系统与服务器断开连接 软件开发企业交什么税 数据库怎样删除不及格成绩的信息 数据库语言哪四类 我们俗称的网卡就是指网络服务器 网络安全金典语录
0