千家信息网

Storm-Hbase接口怎么用

发表于:2024-10-23 作者:千家信息网编辑
千家信息网最后更新 2024年10月23日,这篇文章主要介绍Storm-Hbase接口怎么用,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!package storm.contrib.hbase.bolts;import
千家信息网最后更新 2024年10月23日Storm-Hbase接口怎么用

这篇文章主要介绍Storm-Hbase接口怎么用,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

package storm.contrib.hbase.bolts;import static backtype.storm.utils.Utils.tuple;import java.util.Map;import org.apache.hadoop.hbase.HBaseConfiguration;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import storm.contrib.hbase.utils.HBaseCommunicator;import storm.contrib.hbase.utils.HBaseConnector;/*   一个读取Hbase的Bolt,不断的从Hbase中读取表中的行KEY,和列,通过tuples来发送 * Reads the specified column of HBase table and emits the row key and the column values in the form of tuples */public class HBaseColumnValueLookUpBolt implements IBasicBolt {                private static final long serialVersionUID = 1L;                private String tableName = null, colFamilyName = null, colName = null, rowKeyField = null, columnValue = null;                private static transient HBaseConnector connector = null;        private static transient HBaseConfiguration conf = null;        private static transient HBaseCommunicator communicator = null;        OutputCollector _collector;        /*         * Constructor initializes the variables storing the hbase table information and connects to hbase         */        public HBaseColumnValueLookUpBolt(final String hbaseXmlLocation, final String rowKeyField, final String tableName, final String colFamilyName, final String colName) {                this.tableName = tableName;                this.colFamilyName = colFamilyName;                this.colName = colName;                this.rowKeyField = rowKeyField;                connector = new HBaseConnector();                conf = connector.getHBaseConf(hbaseXmlLocation);                communicator = new HBaseCommunicator(conf);        }        /*         * emits the value of the column with name @colName and rowkey @rowKey         * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)         */        public void execute(Tuple input, BasicOutputCollector collector) {                String rowKey = input.getStringByField(this.rowKeyField);                columnValue = communicator.getColEntry(this.tableName, rowKey, this.colFamilyName, this.colName);                collector.emit(tuple(rowKey, columnValue));        }        public void prepare(Map confMap, TopologyContext context,                        OutputCollector collector) {                _collector = collector;        }        public void cleanup() {        }        public void declareOutputFields(OutputFieldsDeclarer declarer) {                declarer.declare(new Fields("rowKey", "columnValue"));        }        public Map getComponentConfiguration() {                Map map = null;                return map;        }        public void prepare(Map stormConf, TopologyContext context) {        }}
package storm.contrib.hbase.bolts;import static backtype.storm.utils.Utils.tuple;import java.util.Map;import org.apache.hadoop.hbase.HBaseConfiguration;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import storm.contrib.hbase.utils.HBaseCommunicator;import storm.contrib.hbase.utils.HBaseConnector;/* * Reads the specified column of HBase table and emits the row key and the column values in the form of tuples */public class HBaseColumnValueLookUpBolt implements IBasicBolt {                private static final long serialVersionUID = 1L;                private String tableName = null, colFamilyName = null, colName = null, rowKeyField = null, columnValue = null;                private static transient HBaseConnector connector = null;        private static transient HBaseConfiguration conf = null;        private static transient HBaseCommunicator communicator = null;        OutputCollector _collector;        /*         * Constructor initializes the variables storing the hbase table information and connects to hbase         */        public HBaseColumnValueLookUpBolt(final String hbaseXmlLocation, final String rowKeyField, final String tableName, final String colFamilyName, final String colName) {                this.tableName = tableName;                this.colFamilyName = colFamilyName;                this.colName = colName;                this.rowKeyField = rowKeyField;                connector = new HBaseConnector();                conf = connector.getHBaseConf(hbaseXmlLocation);                communicator = new HBaseCommunicator(conf);        }        /*         * emits the value of the column with name @colName and rowkey @rowKey         * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)         */        public void execute(Tuple input, BasicOutputCollector collector) {                String rowKey = input.getStringByField(this.rowKeyField);                        //通过指定我们的 表名,行键,列族,列名,直接通过communitor拿到列的值。                columnValue = communicator.getColEntry(this.tableName, rowKey, this.colFamilyName, this.colName);                collector.emit(tuple(rowKey, columnValue));        }        public void prepare(Map confMap, TopologyContext context,                        OutputCollector collector) {                _collector = collector;        }        public void cleanup() {        }        public void declareOutputFields(OutputFieldsDeclarer declarer) {                declarer.declare(new Fields("rowKey", "columnValue"));        }        public Map getComponentConfiguration() {                Map map = null;                return map;        }        public void prepare(Map stormConf, TopologyContext context) {        }}

Rowkey

package storm.contrib.hbase.spouts;import backtype.storm.topology.OutputFieldsDeclarer;import java.util.Map;import java.util.UUID;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;import java.util.Random;import org.apache.log4j.Logger;/*这个Spout主要是用来发射 Hbase的RowKey,rowkey的集合为自己设置的。 * Spout emitting tuples containing the rowkey of the hbase table */public class RowKeyEmitterSpout implements IRichSpout {            private static final long serialVersionUID = 6814162766489261607L;        public static Logger LOG = Logger.getLogger(RowKeyEmitterSpout.class);    boolean _isDistributed;    SpoutOutputCollector _collector;    public RowKeyEmitterSpout() {        this(true);    }    public RowKeyEmitterSpout(boolean isDistributed) {        _isDistributed = isDistributed;    }        public boolean isDistributed() {        return _isDistributed;    }        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {        _collector = collector;    }        public void close() {    }            public void nextTuple() {        Utils.sleep(100);        Thread.yield();        final String[] words = new String[] {"rowKey1", "rowKey2", "rowKey3", "rowKey4"};        final Random rand = new Random();        final String word = words[rand.nextInt(words.length)];        _collector.emit(new Values(word), UUID.randomUUID());    }        public void ack(Object msgId) {    }    public void fail(Object msgId) {    }        public void declareOutputFields(OutputFieldsDeclarer declarer) {        declarer.declare(new Fields("word"));    }        public void activate() {        }        public void deactivate() {        }        public Map getComponentConfiguration() {                return null;        }}

// 我们用来简单的测试系统的代码,测试接口是否正确

package storm.contrib.hbase.spouts;import java.util.Map;import java.util.Random;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;public class TestSpout implements IRichSpout {        SpoutOutputCollector _collector;        Random _rand;          int count = 0;        public boolean isDistributed() {                return true;        }        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {                _collector = collector;                _rand = new Random();        }        public void nextTuple() {                Utils.sleep(1000);                String[] words = new String[] { "hello", "tiwari", "indore", "jayati"};                Integer[] numbers = new Integer[] {                                1,2,3,4,5                };                if(count == numbers.length -1) {                        count = 0;                }                count ++;                int number = numbers[count];                String word = words[count];                int randomNum = (int) (Math.random()*1000);                _collector.emit(new Values(word, number));        }        public void close() {                }        public void ack(Object id) {        }        public void fail(Object id) {        }        public void declareOutputFields(OutputFieldsDeclarer declarer) {                declarer.declare(new Fields("word", "number"));        }        public void activate() {        }        public void deactivate() {        }        public Map getComponentConfiguration() {                return null;        }}

比较简单,也就不做解释了,Storm-hbase的接口并没有像Storm-kafka的接口那样,自身去处理轮询,自身去处理连接的问题。只是简单的构造了一个Hbase的连接,在连接的过程之中,直接构造了一个Connector就可以了。

以上是"Storm-Hbase接口怎么用"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!

0