storm怎么构建拓扑代码
发表于:2025-02-23 作者:千家信息网编辑
千家信息网最后更新 2025年02月23日,这篇文章主要讲解了"storm怎么构建拓扑代码",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"storm怎么构建拓扑代码"吧!1. 构建拓扑代码pack
千家信息网最后更新 2025年02月23日storm怎么构建拓扑代码
这篇文章主要讲解了"storm怎么构建拓扑代码",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"storm怎么构建拓扑代码"吧!
1. 构建拓扑代码
package demo;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;public class AreaAmtTopo { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new OrderBaseSpout(KafkaProperties.Order_topic),5);builder.setBolt("filter",new AreaFilterBolt(),5).shuffleGrouping("spout");builder.setBolt("areabolt",new AreaAmtBolt(),2).fieldsGrouping("filter",new Fields("area_id"));builder.setBolt("rsltbolt",new AreaRsltBolt(),1).shuffleGrouping("areabolt"); }}
2.一级过滤bolt
package demo;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;//一级的过滤boltpublic class AreaFilterBolt implements IBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("area_id","order_amt","create_time"));//tuple里面每个value的对应name } @Override public MapgetComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public void execute(Tuple input, BasicOutputCollector collector) { //order_id,order_amt,create_time,area_id String order=input.getString(0);//取出集合values中的第一个value if(order!=null){ String orderArr[]=order.split("\\t"); collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short)));//area_id,order_amt,create_time } } @Override public void prepare(Map arg0, TopologyContext arg1) { // TODO Auto-generated method stub }}
3.局部汇总bolt(按日期和区域和汇总)
package demo;import java.util.HashMap;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;//局部汇总public class AreaAmtBolt implements IBasicBolt { MapcountsMap=null; @Override public void declareOutputFields( OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date_area","amt")); } @Override public Map getComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void prepare(Map paramMap, TopologyContext paramTopologyContext) { // TODO Auto-generated method stub countsMap =new HashMap (); } @Override public void execute(Tuple input, BasicOutputCollector collector) { if(input!=null)//如果spout端没数据就会发空值,所以要做判断再往下发 { String area_id=input.getString(0); Double order_amt=input.getDouble(1); String order_date=input.getStringByField("order_date"); Double count=countsMap.get(area_id+"_"+order_date); if (count==null){ count = 0.0; } count+=order_amt; countsMap.put(area_id+"_"+order_date,count); System.err.println("areaAmtBolt"+order_date+"_"+area_id+"="+count); collector.emit(new Values(area_id+"_"+order_date,count)); } } @Override public void cleanup() { countsMap.clear(); }}
4. 最终结果写入Hbase
package demo;import java.util.HashMap;import java.util.HashSet;import java.util.Map;import java.util.Set;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;//结果定时写入hbase的boltpublic class AreaRsltBolt implements IBasicBolt { MapcountsMap=null; long beginTime=System.currentTimeMillis(); long endTime=0L; HBaseDao dao=null; @Override public void declareOutputFields( OutputFieldsDeclarer paramOutputFieldsDeclarer) { // TODO Auto-generated method stub } @Override public Map getComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void prepare(Map paramMap, TopologyContext paramTopologyContext) { countsMap =new HashMap (); dao=new HBaseDAOImp(); } @Override public void execute(Tuple input, BasicOutputCollector paramBasicOutputCollector) { String date_areaid=input.getString(0); double order_amt=input.getDouble(1); countsMap.put(date_areaid,order_amt); endTime=System.currentTimeMillis(); if (endTime-beginTime>=5*1000){ for(String key:countsMap.keySet()){ //put into hbase //2014-05-05_1,amt dao.insert("area_order","cf","order_amt",countsMap.get(key)); System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key)); } beginTime=System.currentTimeMillis(); } } @Override public void cleanup() { // TODO Auto-generated method stub }}
5. DateFmt代码
package demo;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;public class DateFmt { public static final String date_long="yyyy-MM-dd HH:mm:ss"; public static final String date_short="yyyy-MM-dd"; public static SimpleDateFormat sdf=new SimpleDateFormat(date_short); public static String getCountDate(String date,String patton){ SimpleDateFormat sdf=new SimpleDateFormat(patton); Calendar cal =Calendar.getInstance(); if (date!=null){ try { cal.setTime(sdf.parse(date)); } catch (ParseException e) { e.printStackTrace(); } } return sdf.format(cal.getTime()); } public static Date parseDate(String dateStr) throws Exception{ return sdf.parse(dateStr); } public static void main(String[] args) { System.out.println(DateFmt.getCountDate("2015-09-08 09:09:08 ", DateFmt.date_long)); }}
感谢各位的阅读,以上就是"storm怎么构建拓扑代码"的内容了,经过本文的学习后,相信大家对storm怎么构建拓扑代码这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
代码
拓扑
学习
内容
局部
结果
区域
就是
思路
情况
数据
文章
日期
更多
知识
知识点
篇文章
跟着
问题
实践
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全工程师灰色收入
c 代理服务器
怎么查看sql数据库
法律发挥数据库
自学软件安全网络安全
下面()是流行的数据库技术
服务器改名iis
死亡骑士服务器在哪里
软件开发费用快速估算
神州网云网络安全
服务器坏了一个硬盘还能用么
数据库教研
邯山区租房软件开发
amh数据库
数据库访问行为分析仪的功能
给自己的电脑装了一台服务器
给网络安全饭圈乱象的海报
什么导致了网络安全恶化
莱芜农村抽血数据库
软件开发写代码赚钱
云服务器基准性能测试最佳实践
我网络技术有限公司
精神文明网络安全小组
数据库安全发生漏洞的案例
http 访问mysql服务器
网络安全模式从装系统
临海千顺网络技术公司
软件开发外部接口协议
厦门网络安全插画
渝北大学网络安全专业