storm怎么构建拓扑代码
发表于:2025-01-22 作者:千家信息网编辑
千家信息网最后更新 2025年01月22日,这篇文章主要讲解了"storm怎么构建拓扑代码",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"storm怎么构建拓扑代码"吧!1. 构建拓扑代码pack
千家信息网最后更新 2025年01月22日storm怎么构建拓扑代码
这篇文章主要讲解了"storm怎么构建拓扑代码",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"storm怎么构建拓扑代码"吧!
1. 构建拓扑代码
package demo;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;public class AreaAmtTopo { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new OrderBaseSpout(KafkaProperties.Order_topic),5);builder.setBolt("filter",new AreaFilterBolt(),5).shuffleGrouping("spout");builder.setBolt("areabolt",new AreaAmtBolt(),2).fieldsGrouping("filter",new Fields("area_id"));builder.setBolt("rsltbolt",new AreaRsltBolt(),1).shuffleGrouping("areabolt"); }}
2.一级过滤bolt
package demo;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;//一级的过滤boltpublic class AreaFilterBolt implements IBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("area_id","order_amt","create_time"));//tuple里面每个value的对应name } @Override public MapgetComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public void execute(Tuple input, BasicOutputCollector collector) { //order_id,order_amt,create_time,area_id String order=input.getString(0);//取出集合values中的第一个value if(order!=null){ String orderArr[]=order.split("\\t"); collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short)));//area_id,order_amt,create_time } } @Override public void prepare(Map arg0, TopologyContext arg1) { // TODO Auto-generated method stub }}
3.局部汇总bolt(按日期和区域和汇总)
package demo;import java.util.HashMap;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;//局部汇总public class AreaAmtBolt implements IBasicBolt { MapcountsMap=null; @Override public void declareOutputFields( OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date_area","amt")); } @Override public Map getComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void prepare(Map paramMap, TopologyContext paramTopologyContext) { // TODO Auto-generated method stub countsMap =new HashMap (); } @Override public void execute(Tuple input, BasicOutputCollector collector) { if(input!=null)//如果spout端没数据就会发空值,所以要做判断再往下发 { String area_id=input.getString(0); Double order_amt=input.getDouble(1); String order_date=input.getStringByField("order_date"); Double count=countsMap.get(area_id+"_"+order_date); if (count==null){ count = 0.0; } count+=order_amt; countsMap.put(area_id+"_"+order_date,count); System.err.println("areaAmtBolt"+order_date+"_"+area_id+"="+count); collector.emit(new Values(area_id+"_"+order_date,count)); } } @Override public void cleanup() { countsMap.clear(); }}
4. 最终结果写入Hbase
package demo;import java.util.HashMap;import java.util.HashSet;import java.util.Map;import java.util.Set;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;//结果定时写入hbase的boltpublic class AreaRsltBolt implements IBasicBolt { MapcountsMap=null; long beginTime=System.currentTimeMillis(); long endTime=0L; HBaseDao dao=null; @Override public void declareOutputFields( OutputFieldsDeclarer paramOutputFieldsDeclarer) { // TODO Auto-generated method stub } @Override public Map getComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void prepare(Map paramMap, TopologyContext paramTopologyContext) { countsMap =new HashMap (); dao=new HBaseDAOImp(); } @Override public void execute(Tuple input, BasicOutputCollector paramBasicOutputCollector) { String date_areaid=input.getString(0); double order_amt=input.getDouble(1); countsMap.put(date_areaid,order_amt); endTime=System.currentTimeMillis(); if (endTime-beginTime>=5*1000){ for(String key:countsMap.keySet()){ //put into hbase //2014-05-05_1,amt dao.insert("area_order","cf","order_amt",countsMap.get(key)); System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key)); } beginTime=System.currentTimeMillis(); } } @Override public void cleanup() { // TODO Auto-generated method stub }}
5. DateFmt代码
package demo;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;public class DateFmt { public static final String date_long="yyyy-MM-dd HH:mm:ss"; public static final String date_short="yyyy-MM-dd"; public static SimpleDateFormat sdf=new SimpleDateFormat(date_short); public static String getCountDate(String date,String patton){ SimpleDateFormat sdf=new SimpleDateFormat(patton); Calendar cal =Calendar.getInstance(); if (date!=null){ try { cal.setTime(sdf.parse(date)); } catch (ParseException e) { e.printStackTrace(); } } return sdf.format(cal.getTime()); } public static Date parseDate(String dateStr) throws Exception{ return sdf.parse(dateStr); } public static void main(String[] args) { System.out.println(DateFmt.getCountDate("2015-09-08 09:09:08 ", DateFmt.date_long)); }}
感谢各位的阅读,以上就是"storm怎么构建拓扑代码"的内容了,经过本文的学习后,相信大家对storm怎么构建拓扑代码这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
代码
拓扑
学习
内容
局部
结果
区域
就是
思路
情况
数据
文章
日期
更多
知识
知识点
篇文章
跟着
问题
实践
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件工程属于计算机网络技术吗
苹果手机wifi提示网络安全
问卷调查如何建立数据库
基础网络安全产品
中欧软件开发
阿里云 服务器搭建网站
常德鸿灿网络技术有限公司
网络安全教育公益讲座安道新
云南数据网络安全工程问答知识
软件开发算工科还是理科
南京同在互联网科技
软件开发兼职青岛
软件开发技术图片
普华永道取软件开发
十堰网络技术学校
yuga 数据库
网络安全卫士怎么处理
mc换服务器
服务器svn管理工具下载
rwdis数据库
应对机关网络安全采取的措施
网盘文件存在服务器的哪里
华硕软件开发工资
北京一站式网络技术有哪些
服务器管理器 音频服务器
psn 换服务器
网络安全法第三十四条规范
数据库原来及应用雷景生答案
oracle数据库频繁掉线
计算机网络技术命令表