千家信息网

Storm流方式的统计系统怎么实现

发表于:2024-09-27 作者:千家信息网编辑
千家信息网最后更新 2024年09月27日,本篇内容主要讲解"Storm流方式的统计系统怎么实现",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Storm流方式的统计系统怎么实现"吧!1: 初期硬件准
千家信息网最后更新 2024年09月27日Storm流方式的统计系统怎么实现

本篇内容主要讲解"Storm流方式的统计系统怎么实现",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Storm流方式的统计系统怎么实现"吧!

1: 初期硬件准备:

1 如果条件具备:请保证您安装好了 redis集群

2 配置好您的Storm开发环境

3 保证好您的开发环境的畅通: 主机与主机之间,Storm与redis之间

2:业务背景的介绍:

1 在这里我们将模拟一个 流方式的数据处理过程

2 数据的源头保存在我们的redis 集群之中

3 发射的数据格式为: ip,url,client_key

数据发射器

package storm.spout;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Values;import backtype.storm.tuple.Fields;import org.json.simple.JSONObject;import org.json.simple.JSONValue;import redis.clients.jedis.Jedis;import storm.utils.Conf;import java.util.Map;import org.apache.log4j.Logger;/** * click Spout 从redis中间读取所需要的数据 */public class ClickSpout extends BaseRichSpout {        private static final long serialVersionUID = -6200450568987812474L;        public static Logger LOG = Logger.getLogger(ClickSpout.class);        // 对于redis,我们使用的是jedis客户端        private Jedis jedis;        // 主机        private String host;        // 端口        private int port;        // Spout 收集器        private SpoutOutputCollector collector;        @Override        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {                        // 这里,我们发射的格式为                // IP,URL,CLIENT_KEY                outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.IP,                                storm.cookbook.Fields.URL, storm.cookbook.Fields.CLIENT_KEY));        }        @Override        public void open(Map conf, TopologyContext topologyContext,                        SpoutOutputCollector spoutOutputCollector) {                host = conf.get(Conf.REDIS_HOST_KEY).toString();                port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());                this.collector = spoutOutputCollector;                connectToRedis();        }        private void connectToRedis() {                jedis = new Jedis(host, port);        }        @Override        public void nextTuple() {                String content = jedis.rpop("count");                if (content == null || "nil".equals(content)) {                        try {                                Thread.sleep(300);                        } catch (InterruptedException e) {                        }                } else {                        // 将jedis对象 rpop出来的字符串解析为 json对象                        JSONObject obj = (JSONObject) JSONValue.parse(content);                        String ip = obj.get(storm.cookbook.Fields.IP).toString();                        String url = obj.get(storm.cookbook.Fields.URL).toString();                        String clientKey = obj.get(storm.cookbook.Fields.CLIENT_KEY)                                        .toString();                        System.out.println("this is a clientKey");                        // List tuple对象                        collector.emit(new Values(ip, url, clientKey));                }        }}

在这个过程之中,请注意:

1 我们在 OPEN 方法之中初始化 host,port,collector,以及Redis的连接,调用Connect方法并连接到redis数据库

2 我们在nextTupe 取出数据,并且将他转换为一个JSON对象,并且拿到 ip,url,clientKey,同时将他们包装成为一个

Values对象

让我们来看看数据的流向图:

在我们的数据从clickSpout 读取以后,接下来,我们将采用2个bolt

1 : repeatVisitBolt

2 : geographyBolt

共同来读取同一个数据源的数据:clickSpout

3 细细察看 repeatVisitBolt

package storm.bolt;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import redis.clients.jedis.Jedis;import storm.utils.Conf;import java.util.Map;public class RepeatVisitBolt extends BaseRichBolt {        private OutputCollector collector;        private Jedis jedis;        private String host;        private int port;        @Override        public void prepare(Map conf, TopologyContext topologyContext,                        OutputCollector outputCollector) {                this.collector = outputCollector;                host = conf.get(Conf.REDIS_HOST_KEY).toString();                port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());                connectToRedis();        }        private void connectToRedis() {                jedis = new Jedis(host, port);                jedis.connect();        }        public boolean isConnected() {                if (jedis == null)                        return false;                return jedis.isConnected();        }        @Override        public void execute(Tuple tuple) {                String ip = tuple.getStringByField(storm.cookbook.Fields.IP);                String clientKey = tuple                                .getStringByField(storm.cookbook.Fields.CLIENT_KEY);                String url = tuple.getStringByField(storm.cookbook.Fields.URL);                String key = url + ":" + clientKey;                String value = jedis.get(key);                                // redis中取,如果redis中没有,就插入新的一条访问记录。                if (value == null) {                        jedis.set(key, "visited");                        collector.emit(new Values(clientKey, url, Boolean.TRUE.toString()));                } else {                        collector                                        .emit(new Values(clientKey, url, Boolean.FALSE.toString()));                }        }        @Override        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {                outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(                                storm.cookbook.Fields.CLIENT_KEY, storm.cookbook.Fields.URL,                                storm.cookbook.Fields.UNIQUE));        }}

在这里,我们把url 和 clientKey 组合成为 【url:clientKey】的格式组合,并依据这个对象,在redis中去查找,如果没有,那那Set到redis中间去,并且判定它为【unique】

4:

package storm.bolt;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.Map;public class VisitStatsBolt extends BaseRichBolt {    private OutputCollector collector;    private int total = 0;    private int uniqueCount = 0;    @Override    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {        this.collector = outputCollector;    }    @Override    public void execute(Tuple tuple) {                        //在这里,我们在上游来判断这个Fields 是否是独特和唯一的        boolean unique = Boolean.parseBoolean(tuple.getStringByField(storm.cookbook.Fields.UNIQUE));                total++;        if(unique)uniqueCount++;        collector.emit(new Values(total,uniqueCount));    }    @Override    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {        outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(storm.cookbook.Fields.TOTAL_COUNT,                        storm.cookbook.Fields.TOTAL_UNIQUE));    }}

第一次出现,uv ++

5 接下来,看看流水线2 :

package storm.bolt;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import org.json.simple.JSONObject;import storm.cookbook.IPResolver;import java.util.HashMap;import java.util.List;import java.util.Map;/** * User: yin shaui Date: 2014/05/21 Time: 8:58 AM To change this template use * File | Settings | File Templates. */public class GeographyBolt extends BaseRichBolt {        // ip解析器        private IPResolver resolver;        private OutputCollector collector;        public GeographyBolt(IPResolver resolver) {                this.resolver = resolver;        }        @Override        public void prepare(Map map, TopologyContext topologyContext,                        OutputCollector outputCollector) {                this.collector = outputCollector;        }        @Override        public void execute(Tuple tuple) {                // 1 从上级的目录之中拿到我们所要使用的ip                String ip = tuple.getStringByField(storm.cookbook.Fields.IP);                // 将ip 转换为json                JSONObject json = resolver.resolveIP(ip);                // 将 city和country 组织成为一个新的元祖,在这里也就是我们的Values对象                String city = (String) json.get(storm.cookbook.Fields.CITY);                String country = (String) json.get(storm.cookbook.Fields.COUNTRY_NAME);                collector.emit(new Values(country, city));        }        @Override        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {                // 确定了我们这次输出元祖的格式                outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.COUNTRY,                                storm.cookbook.Fields.CITY));        }}

以上Bolt,完成了一个Ip到 CITY,COUNTRY 的转换

package storm.bolt;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.HashMap;import java.util.LinkedList;import java.util.List;import java.util.Map;public class GeoStatsBolt extends BaseRichBolt {        private class CountryStats {                //                private int countryTotal = 0;                private static final int COUNT_INDEX = 0;                private static final int PERCENTAGE_INDEX = 1;                private String countryName;                public CountryStats(String countryName) {                        this.countryName = countryName;                }                private Map> cityStats = new HashMap>();                /**                 * @param cityName                 */                public void cityFound(String cityName) {                        countryTotal++;                        // 已经有了值,一个加1的操作                        if (cityStats.containsKey(cityName)) {                                cityStats.get(cityName)                                                .set(COUNT_INDEX,                                                                cityStats.get(cityName).get(COUNT_INDEX)                                                                                .intValue() + 1);                                // 没有值的时候                        } else {                                List list = new LinkedList();                                list.add(1);                                list.add(0);                                cityStats.put(cityName, list);                        }                        double percent = (double) cityStats.get(cityName).get(COUNT_INDEX)                                        / (double) countryTotal;                        cityStats.get(cityName).set(PERCENTAGE_INDEX, (int) percent);                }                /**                 * @return 拿到的国家总数                 */                public int getCountryTotal() {                        return countryTotal;                }                /**                 * @param cityName  依据传入的城市名称,拿到城市总数                 * @return                 */                public int getCityTotal(String cityName) {                        return cityStats.get(cityName).get(COUNT_INDEX).intValue();                }                                public String toString() {                        return "Total Count for " + countryName + " is "                                        + Integer.toString(countryTotal) + "\n" + "Cities:  "                                        + cityStats.toString();                }        }        private OutputCollector collector;        // CountryStats 是一个内部类的对象        private Map stats = new HashMap();        @Override        public void prepare(Map map, TopologyContext topologyContext,                        OutputCollector outputCollector) {                this.collector = outputCollector;        }        @Override        public void execute(Tuple tuple) {                String country = tuple.getStringByField(storm.cookbook.Fields.COUNTRY);                String city = tuple.getStringByField(storm.cookbook.Fields.CITY);                // 如果国家不存在的时候,新增加一个国家,国家的统计                if (!stats.containsKey(country)) {                        stats.put(country, new CountryStats(country));                }                // 这里拿到新的统计,cityFound 是拿到某个城市的值                stats.get(country).cityFound(city);                collector.emit(new Values(country,                                stats.get(country).getCountryTotal(), city, stats.get(country)                                                .getCityTotal(city)));        }        @Override        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {                outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(                                storm.cookbook.Fields.COUNTRY,                                storm.cookbook.Fields.COUNTRY_TOTAL,                                storm.cookbook.Fields.CITY, storm.cookbook.Fields.CITY_TOTAL));        }}

有关地理位置的统计,附带上程序其他的使用类

package storm.cookbook;/** */public class Fields {        public static final String IP = "ip";                public static final String URL = "url";                public static final String CLIENT_KEY = "clientKey";                public static final String COUNTRY = "country";                public static final String COUNTRY_NAME = "country_name";                public static final String CITY = "city";                //唯一的,独一无二的        public static final String UNIQUE = "unique";                //城镇整数        public static final String COUNTRY_TOTAL = "countryTotal";                //城市整数        public static final String CITY_TOTAL = "cityTotal";                //总共计数        public static final String TOTAL_COUNT = "totalCount";                //总共独一无二的        public static final String TOTAL_UNIQUE = "totalUnique";}
package storm.cookbook;import org.json.simple.JSONObject;import org.json.simple.JSONValue;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.io.Serializable;import java.net.MalformedURLException;import java.net.URL;import java.net.URLConnection;public class HttpIPResolver implements IPResolver, Serializable {        static String url = "http://api.hostip.info/get_json.php";        @Override        public JSONObject resolveIP(String ip) {                URL geoUrl = null;                BufferedReader in = null;                try {                        geoUrl = new URL(url + "?ip=" + ip);                        URLConnection connection = geoUrl.openConnection();                        in = new BufferedReader(new InputStreamReader(                                        connection.getInputStream()));                        String inputLine;                        JSONObject json = (JSONObject) JSONValue.parse(in);                        in.close();                        return json;                } catch (IOException e) {                        e.printStackTrace();                } finally {                        // 每当in为空的时候我们不进行如下的close操作,只有在in不为空的时候进行close操作                        if (in != null) {                                try {                                        in.close();                                } catch (IOException e) {                                }                        }                }                return null;        }}
package storm.cookbook;import org.json.simple.JSONObject;/** * Created with IntelliJ IDEA. * User: admin * Date: 2012/12/07 * Time: 5:29 PM * To change this template use File | Settings | File Templates. */public interface IPResolver {        public JSONObject resolveIP(String ip);}

到此,相信大家对"Storm流方式的统计系统怎么实现"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0