千家信息网

storm-kafka(storm spout作为kafka的消费端)

发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,storm是grovvy写的kafka是scala写的storm-kafka storm连接kafka consumer的插件下载地址:https://github.com/wurstmeister/
千家信息网最后更新 2025年02月05日storm-kafka(storm spout作为kafka的消费端)

storm是grovvy写的

kafka是scala写的

storm-kafka storm连接kafka consumer的插件

下载地址:

https://github.com/wurstmeister/storm-kafka-0.8-plus




除了需要storm和kafka相关jar包还需要google-collections-1.0.jar

以及zookeeper相关包 curator-framework-1.3.3.jar和curator-client-1.3.3.jar

以前由com.netflix.curator组织开发现在归到org.apache.curator下面



1.Kafka Consumer即Storm Spout代码

package demo;import java.util.ArrayList;import java.util.List;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.generated.AlreadyAliveException;import backtype.storm.generated.InvalidTopologyException;import backtype.storm.spout.SchemeAsMultiScheme;import backtype.storm.topology.TopologyBuilder;import storm.kafka.KafkaSpout;import storm.kafka.SpoutConfig;import storm.kafka.StringScheme;import storm.kafka.ZkHosts;public class MyKafkaSpout {public static void main(String[] args) {        String topic ="track";    ZkHosts zkhosts  = new ZkHosts("192.168.1.107:2181,192.168.1.108:2181,192.168.1.109:2181");        SpoutConfig spoutConfig = new SpoutConfig(zkhosts, topic,            "/MyKafka", //偏移量offset的根目录            "MyTrack");//子目录对应一个应用        List zkServers=new ArrayList();    //zkServers.add("192.168.1.107");    //zkServers.add("192.168.1.108");    for(String host:zkhosts.brokerZkStr.split(","))    {        zkServers.add(host.split(":")[0]);    }        spoutConfig.zkServers=zkServers;    spoutConfig.zkPort=2181;    spoutConfig.forceFromStart=true;//从头开始消费,实际上是要改成false的    spoutConfig.socketTimeoutMs=60;    spoutConfig.scheme=new SchemeAsMultiScheme(new StringScheme());//定义输出为string类型        TopologyBuilder builder=new TopologyBuilder();    builder.setSpout("spout", new KafkaSpout(spoutConfig),1);//引用spout,并发度设为1    builder.setBolt("bolt1", new MyKafkaBolt(),1).shuffleGrouping("spout");        Config config =new Config();    config.setDebug(true);//上线之前都要改成false否则日志会非常多    if(args.length>0){                try {            StormSubmitter.submitTopology(args[0], config, builder.createTopology());        } catch (AlreadyAliveException e) {            // TODO Auto-generated catch block            e.printStackTrace();        } catch (InvalidTopologyException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }            }else{                LocalCluster localCluster=new LocalCluster();        localCluster.submitTopology("mytopology", config,  builder.createTopology());        //本地模式在一个进程里面模拟一个storm集群的所有功能    }            }}


2.Bolt代码只是简单打印输出,覆写execute方法即可

package demo;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class MyKafkaBolt implements IBasicBolt {    @Override    public void declareOutputFields(OutputFieldsDeclarer arg0) {        // TODO Auto-generated method stub    }    @Override    public Map getComponentConfiguration() {        // TODO Auto-generated method stub        return null;    }    @Override    public void cleanup() {        // TODO Auto-generated method stub    }    @Override    public void execute(Tuple input, BasicOutputCollector arg1) {    String kafkaMsg =input.getString(0);    System.err.println("bolt"+kafkaMsg);    }    @Override    public void prepare(Map arg0, TopologyContext arg1) {        // TODO Auto-generated method stub    }}






代码 输出 消费 从头 功能 只是 地址 子目 子目录 实际 实际上 插件 插件下载 方法 日志 根目录 模式 类型 进程 集群 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 戴尔霄龙服务器厂商电话多少 北京数据软件开发价格服务标准 网络安全可信体系建设 安徽八八六软件开发公司电话 苏州法院网络安全专员 曙光服务器维修站 怎么修改数据库的缓存 石材创新互联网科技公司 定制软件开发要多少钱 海康威视硬盘阵列服务器 网络安全生态建设应当 查看服务器程序占用的端口命令 网络安全技术入门课程 软件开发专业英文翻译 阿克苏软件开发公司 单机dnf为什么要数据库 软件开发ea安装 关于网络安全的时事评议论文 用友u8数据库修复 水声通信网络技术水平 首选服务器设置 安徽安庆网络安全宣传 中国人寿软件开发终面 方舟手游服务器一直开不起来 win7终端服务器授权安装 计算机网络技术跟软件技术一样嘛 互联网的普及和科技的发展 工作站软件开发 农安什么是网络技术服务至上 信息类互联网科技公司
0