spark+kafka+redis统计网站访问者IP
发表于:2024-09-23 作者:千家信息网编辑
千家信息网最后更新 2024年09月23日,*目的是为了防采集。需要对网站的日志信息,进行一个实时的IP访问监控。1、kafka版本是最新的0.10.0.02、spark版本是1.613、下载对应的spark-streaming-kafka-a
千家信息网最后更新 2024年09月23日spark+kafka+redis统计网站访问者IP
*目的是为了防采集。需要对网站的日志信息,进行一个实时的IP访问监控。
1、kafka版本是最新的0.10.0.0
2、spark版本是1.61
3、下载对应的spark-streaming-kafka-assembly_2.10-1.6.1.jar放到spark目录下的lib目录下
下载地址 https://repo1.maven.org/maven2/org/apache/spark/
4、利用flume将nginx日志写入到kafka(后续补充)
5、编写python脚本,命名为test_spark_collect_ip.py
# coding:utf-8__author__ = 'chenhuachao''''利用pyspark连接kafka,统计访问者的IP信息,做出的一个实时的防采集'''import sysreload(sys)sys.setdefaultencoding('utf-8')import redisimport datetimefrom pyspark.streaming.kafka import KafkaUtilsfrom pyspark.streaming import StreamingContextfrom pyspark import SparkConf, SparkContextdef parse(logstring): try: infodict = eval(logstring.encode('utf-8')) ip =infodict.get('ip') assert infodict['tj-event'] == 'onload' assert ip return (ip) except: return ()def insert_redis(rdd): '''将符合条件的结果写入到redis''' conn = redis.Redis(host='redis的IP',port=6380) for i,j in rdd.collect(): print i,j if j >=3 and j != "": conn.sadd('cheating_ip_set_{0}'.format(datetime.datetime.now().strftime("%Y%m%d")),i) conn.expire('cheating_ip_set',86400)if __name__ == "__main__": topic = 'statis-detailinfo-pageevent' sc = SparkContext(appName="pyspark_kafka_streaming_chc") ssc = StreamingContext(sc,10) checkpointDirectory = '/tmp/checkpoint/cp3' ssc.checkpoint(checkpointDirectory) kvs = KafkaUtils.createDirectStream(ssc,['statis-detailinfo-pageevent'],kafkaParams={"auto.offset.reset": "largest","metadata.broker.list":"kafka-IP:9092,kafka-IP:9092"}) #kvs.map(lambda line:line[1]).map(lambda x:parse(x)).pprint() #这里用到了一个滑动窗口的概念,需要深入了解的可以参考http://www.kancloud.cn/kancloud/spark-programming-guide/51567 #ipcount = kvs.map(lambda line: line[1]).map(lambda x:parse(x)).map(lambda ip:(ip,1)).reduceByKey(lambda ips,num:ips+num) ipcount = kvs.map(lambda line: line[1]).map(lambda x:parse(x)).map(lambda ip:(ip,1)).reduceByKeyAndWindow(lambda ips,num:ips+num,30,10) # 预处理,如果需要多次计算则使用缓存 # 传入rdd进行循坏,即用于foreachRdd(insertRedis) ipcount.foreachRDD(insert_redis) # 各节点的rdd的循坏 # wordCounts.foreachRDD(lambda rdd: rdd.foreach(sendRecord)) ssc.start()
6、执行命令
bin/spark-submit --jars lib/spark-streaming-kafka-assembly_2.10-1.6.1.jar test_spark_collect_ip.py
7、输出界面
8、更多信息,请参考spark的官网http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#module-pyspark.streaming.kafka
信息
实时
日志
版本
目录
参考
网站
访问者
统计
命令
地址
更多
条件
概念
界面
目的
结果
缓存
脚本
节点
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库数据类型单选框
中文法律数据库的作用
眉山网络技术怎么代理
郑州app软件开发要多少钱
深圳口碑好的软件开发联系方式
河北新一代网络技术质量服务
网络技术演讲ppt
研究网络安全有什么收获
软件开发人员开发方向
软件开发买苹果笔记本哪一款
带nvme接口的服务器主板
网络安全约束力
软件开发成本由谁完成
小白数据库小米11u
h3c服务器管理接口
计算机报名网络技术
软件开发中兼容性问题
战地5服务器全满了怎么办
软件开发项目wbs图
软件开发项目实施复盘总结报告
如何获取串口服务器转发的数据
绵阳网络安全服务
天津高职院计算机网络技术就业
数据库禁用所有约束
怎么查看数据库字符集
曲靖gpu云服务器经销商
战地5服务器全满了怎么办
考研数据库技术技巧
dell服务器e5530
网络安全视频前奏