千家信息网

Storm MongoDB接口怎么使用

发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,本篇内容介绍了"Storm MongoDB接口怎么使用"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
千家信息网最后更新 2025年02月02日Storm MongoDB接口怎么使用

本篇内容介绍了"Storm MongoDB接口怎么使用"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

整体的Storn接口分为以下的几个class

1:MongoBolt.java

2 : MongoSpout.java

3 : MongoTailableCursorTopology.java

4 : SimpleMongoBolt.java

看代码说话:

1

package storm.mongo;import java.util.Map;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import com.mongodb.DB;import com.mongodb.DBObject;import com.mongodb.MongoClient;import com.mongodb.MongoException;import com.mongodb.WriteConcern;/** * * 注意在这里,没有实现批处理的调用,并且只是一个抽象类,对于Mongo的Storm交互做了一次封装 * * @author Adrian Petrescu  * */public abstract class MongoBolt extends BaseRichBolt {        private OutputCollector collector;                // MOngDB的DB对象        private DB mongoDB;                        //记录我们的主机,端口,和MongoDB的数据DB民粹        private final String mongoHost;        private final int mongoPort;        private final String mongoDbName;        /**         * @param mongoHost The host on which Mongo is running.         * @param mongoPort The port on which Mongo is running.         * @param mongoDbName The Mongo database containing all collections being         * written to.         */        protected MongoBolt(String mongoHost, int mongoPort, String mongoDbName) {                this.mongoHost = mongoHost;                this.mongoPort = mongoPort;                this.mongoDbName = mongoDbName;        }                @Override        public void prepare(                        @SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) {                                this.collector = collector;                try {                                        //prepare方法目前在初始化的过程之中得到了一个Mongo的连接                        this.mongoDB = new MongoClient(mongoHost, mongoPort).getDB(mongoDbName);                } catch (Exception e) {                        throw new RuntimeException(e);                }        }        @Override        public void execute(Tuple input) {                                    //注意我们在这里还有一个判断,判断当前是否该发射                        if (shouldActOnInput(input)) {                        String collectionName = getMongoCollectionForInput(input);                        DBObject dbObject = getDBObjectForInput(input);                        if (dbObject != null) {                                try {                                        mongoDB.getCollection(collectionName).save(dbObject, new WriteConcern(1));                                        collector.ack(input);                                } catch (MongoException me) {                                        collector.fail(input);                                }                        }                } else {                        collector.ack(input);                }        }        /**         * Decide whether or not this input tuple should trigger a Mongo write.         *         * @param input the input tuple under consideration         * @return {@code true} iff this input tuple should trigger a Mongo write         */        public abstract boolean shouldActOnInput(Tuple input);                /**         * Returns the Mongo collection which the input tuple should be written to.         *         * @param input the input tuple under consideration         * @return the Mongo collection which the input tuple should be written to         */        public abstract String getMongoCollectionForInput(Tuple input);                /**         * Returns the DBObject to store in Mongo for the specified input tuple.         *                   拿到DBObject的一个抽象类                           * @param input the input tuple under consideration         * @return the DBObject to be written to Mongo         */        public abstract DBObject getDBObjectForInput(Tuple input);                        //注意这里随着计算的终结被关闭了。        @Override        public void cleanup() {                this.mongoDB.getMongo().close();        }}

2 :

package storm.mongo;import java.util.List;import java.util.Map;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.atomic.AtomicBoolean;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.utils.Utils;import com.mongodb.BasicDBObject;import com.mongodb.Bytes;import com.mongodb.DB;import com.mongodb.DBCursor;import com.mongodb.DBObject;import com.mongodb.MongoClient;import com.mongodb.MongoException;/*** A Spout which consumes documents from a Mongodb tailable cursor.** Subclasses should simply override two methods:* 
    *
  • {@link #declareOutputFields(OutputFieldsDeclarer) declareOutputFields}*
  • {@link #dbObjectToStormTuple(DBObject) dbObjectToStormTuple}, which turns* a Mongo document into a Storm tuple matching the declared output fields.*
***

* WARNING: You can only use tailable cursors on capped collections.* * @author Dan Beaulieu **/// 在这里,抽象的过程中,依旧保持了第一层的Spout为一个抽象类,MongoSpout为abstract的一个抽象类,子类在继承这// 个类的过程之中实现特定的方法即可// 这里还有一个类似Cursor的操作。public abstract class MongoSpout extends BaseRichSpout { private SpoutOutputCollector collector; private LinkedBlockingQueue queue; private final AtomicBoolean opened = new AtomicBoolean(false); private DB mongoDB; private final DBObject query; private final String mongoHost; private final int mongoPort; private final String mongoDbName; private final String mongoCollectionName; public MongoSpout(String mongoHost, int mongoPort, String mongoDbName, String mongoCollectionName, DBObject query) { this.mongoHost = mongoHost; this.mongoPort = mongoPort; this.mongoDbName = mongoDbName; this.mongoCollectionName = mongoCollectionName; this.query = query; } class TailableCursorThread extends Thread { // 内部类 TailableCursorThread线程 //注意在其中我们使用了LinkedBlockingQueue的对象,有关java高并发的集合类,请参考本ID的【Java集合类型的博文】博文。 LinkedBlockingQueue queue; String mongoCollectionName; DB mongoDB; DBObject query; public TailableCursorThread(LinkedBlockingQueue queue, DB mongoDB, String mongoCollectionName, DBObject query) { this.queue = queue; this.mongoDB = mongoDB; this.mongoCollectionName = mongoCollectionName; this.query = query; } public void run() { while(opened.get()) { try { // create the cursor mongoDB.requestStart(); final DBCursor cursor = mongoDB.getCollection(mongoCollectionName) .find(query) .sort(new BasicDBObject("$natural", 1)) .addOption(Bytes.QUERYOPTION_TAILABLE) .addOption(Bytes.QUERYOPTION_AWAITDATA); try { while (opened.get() && cursor.hasNext()) { final DBObject doc = cursor.next(); if (doc == null) break; queue.put(doc); } } finally { try { if (cursor != null) cursor.close(); } catch (final Throwable t) { } try { mongoDB.requestDone(); } catch (final Throwable t) { } } Utils.sleep(500); } catch (final MongoException.CursorNotFound cnf) { // rethrow only if something went wrong while we expect the cursor to be open. if (opened.get()) { throw cnf; } } catch (InterruptedException e) { break; } } }; } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.queue = new LinkedBlockingQueue(1000); try { this.mongoDB = new MongoClient(this.mongoHost, this.mongoPort).getDB(this.mongoDbName); } catch (Exception e) { throw new RuntimeException(e); } TailableCursorThread listener = new TailableCursorThread(this.queue, this.mongoDB, this.mongoCollectionName, this.query); this.opened.set(true); listener.start(); } @Override public void close() { this.opened.set(false); } @Override public void nextTuple() { DBObject dbo = this.queue.poll(); if(dbo == null) { Utils.sleep(50); } else { this.collector.emit(dbObjectToStormTuple(dbo)); } } @Override public void ack(Object msgId) { // TODO Auto-generated method stub } @Override public void fail(Object msgId) { // TODO Auto-generated method stub } public abstract List dbObjectToStormTuple(DBObject message);}

"Storm MongoDB接口怎么使用"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0