千家信息网

storm drpc怎么定义

发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,本篇内容介绍了"storm drpc怎么定义"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!代码:pa
千家信息网最后更新 2025年02月06日storm drpc怎么定义

本篇内容介绍了"storm drpc怎么定义"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

代码:

package main.java;import main.java.bolt.RequestCounter;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.LocalDRPC;import backtype.storm.StormSubmitter;import backtype.storm.drpc.LinearDRPCTopologyBuilder;import backtype.storm.utils.DRPCClient;/** * DRPC example *   * @author sjyu * */public class DRPCTopologyMain {        public static void main(String[] args) throws Exception {//              LocalDRPC drpc = new LocalDRPC();                DRPCClient drpc = new DRPCClient("192.168.1.240", 3772);                LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("test_func");                builder.addBolt(new RequestCounter(), 2);                Config conf = new Config();                conf.setDebug(true);                StormSubmitter.submitTopology("drpc_test", conf, builder.createRemoteTopology());//              LocalCluster cluster = new LocalCluster();//              cluster.submitTopology("local_cluster", conf, builder.createLocalTopology(drpc));                String str = drpc.execute("test_func", "this is a test");                //这边drpc的client和server写在一起了,不知道可不可以写在两个进程里,                //想像中应该没问题,就像网络编程一样,但是行不行还有待验证。                System.out.println(str);        }}
package main.java.bolt;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class RequestCounter extends BaseBasicBolt {//      Object id = new Object();这边好像不能定义一个变量,不然就报错,//      不知道是storm的原因还是java本来就不能这样(我觉得我似乎要去学学java了- -)        @Override        public void execute(Tuple input, BasicOutputCollector collector) {                String str = (String) input.getString(1);                collector.emit(new Values(input.getValue(0), str));        }        @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {                declarer.declare(new Fields("id","result"));        }}

"storm drpc怎么定义"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0