spark+kafka+redis统计网站访问者IP
发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,*目的是为了防采集。需要对网站的日志信息,进行一个实时的IP访问监控。1、kafka版本是最新的0.10.0.02、spark版本是1.613、下载对应的spark-streaming-kafka-a
千家信息网最后更新 2025年01月24日spark+kafka+redis统计网站访问者IP
*目的是为了防采集。需要对网站的日志信息,进行一个实时的IP访问监控。
1、kafka版本是最新的0.10.0.0
2、spark版本是1.61
3、下载对应的spark-streaming-kafka-assembly_2.10-1.6.1.jar放到spark目录下的lib目录下
下载地址 https://repo1.maven.org/maven2/org/apache/spark/
4、利用flume将nginx日志写入到kafka(后续补充)
5、编写python脚本,命名为test_spark_collect_ip.py
# coding:utf-8__author__ = 'chenhuachao''''利用pyspark连接kafka,统计访问者的IP信息,做出的一个实时的防采集'''import sysreload(sys)sys.setdefaultencoding('utf-8')import redisimport datetimefrom pyspark.streaming.kafka import KafkaUtilsfrom pyspark.streaming import StreamingContextfrom pyspark import SparkConf, SparkContextdef parse(logstring): try: infodict = eval(logstring.encode('utf-8')) ip =infodict.get('ip') assert infodict['tj-event'] == 'onload' assert ip return (ip) except: return ()def insert_redis(rdd): '''将符合条件的结果写入到redis''' conn = redis.Redis(host='redis的IP',port=6380) for i,j in rdd.collect(): print i,j if j >=3 and j != "": conn.sadd('cheating_ip_set_{0}'.format(datetime.datetime.now().strftime("%Y%m%d")),i) conn.expire('cheating_ip_set',86400)if __name__ == "__main__": topic = 'statis-detailinfo-pageevent' sc = SparkContext(appName="pyspark_kafka_streaming_chc") ssc = StreamingContext(sc,10) checkpointDirectory = '/tmp/checkpoint/cp3' ssc.checkpoint(checkpointDirectory) kvs = KafkaUtils.createDirectStream(ssc,['statis-detailinfo-pageevent'],kafkaParams={"auto.offset.reset": "largest","metadata.broker.list":"kafka-IP:9092,kafka-IP:9092"}) #kvs.map(lambda line:line[1]).map(lambda x:parse(x)).pprint() #这里用到了一个滑动窗口的概念,需要深入了解的可以参考http://www.kancloud.cn/kancloud/spark-programming-guide/51567 #ipcount = kvs.map(lambda line: line[1]).map(lambda x:parse(x)).map(lambda ip:(ip,1)).reduceByKey(lambda ips,num:ips+num) ipcount = kvs.map(lambda line: line[1]).map(lambda x:parse(x)).map(lambda ip:(ip,1)).reduceByKeyAndWindow(lambda ips,num:ips+num,30,10) # 预处理,如果需要多次计算则使用缓存 # 传入rdd进行循坏,即用于foreachRdd(insertRedis) ipcount.foreachRDD(insert_redis) # 各节点的rdd的循坏 # wordCounts.foreachRDD(lambda rdd: rdd.foreach(sendRecord)) ssc.start()
6、执行命令
bin/spark-submit --jars lib/spark-streaming-kafka-assembly_2.10-1.6.1.jar test_spark_collect_ip.py
7、输出界面
8、更多信息,请参考spark的官网http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#module-pyspark.streaming.kafka
信息
实时
日志
版本
目录
参考
网站
访问者
统计
命令
地址
更多
条件
概念
界面
目的
结果
缓存
脚本
节点
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发适合哪种成本核算
网络安全培训主持
端游方舟怎么进之前的服务器
河南易智胜网络技术有限公司
smtp服务器名称在哪里找
美国专利文本与图像数据库
广州市梦享网络技术公司
tp710n打印机服务器
茁壮网络安全工程师
数据库专业导论论文
数据库工程师和运维工程师
识别网络技术的方法
榆树先进网络技术服务品质保障
数据库与数据库表的区别
无法用流量打开服务器
计算机网络技术吃香不吃香
北京大型软件开发公司招聘
宣城雌核网络技术有限公司
腾讯千帆自定义配置数据库
在网络安全和信息
考研信息网络安全的要求
服务器只开80端口安全吗
数据库系统概论文献综述
CDN软件开发公司
有哪些网络安全的杂志
微软服务器有多大
软件开发和使用所涉及的判断题
杭州珍林网络技术
为什么数据库还原失败
腾讯千帆自定义配置数据库