千家信息网

Python重写Logstash,把Nginx Access Log清洗后汇入Elastic DB

发表于:2024-12-13 作者:千家信息网编辑
千家信息网最后更新 2024年12月13日,Step1. 修改Nginx的log格式(改为JSON格式)把nginx的log_format改为以下的参数(修改/etc/nginx/nginx.conf):log_format main '{"@
千家信息网最后更新 2024年12月13日Python重写Logstash,把Nginx Access Log清洗后汇入Elastic DB

Step1. 修改Nginx的log格式(改为JSON格式)
把nginx的log_format改为以下的参数(修改/etc/nginx/nginx.conf):
log_format main '{"@timestamp":"$time_iso8601","host":"$server_addr","clientip":"$remote_addr","size":$body_bytes_sent,"responsetime":$request_time,"upstreamtime":"$upstream_response_time","upstreamhost":"$upstream_addr","http_host":"$host","url":"$uri","xff":"$http_x_forwarded_for","referer":"$http_referer","agent":"$http_user_agent","status":"$status"}';
reload nginx后,看到access.log的格式如下:
{"@timestamp":"2017-12-13T17:29:49+08:00","host":"120.76.XX.XX","clientip":"120.76.XX.XXX","size":26963,"responsetime":0.000,"upstreamtime":"0.000","upstreamhost":"127.0.0.1:8080","http_host":"weixin.XXX.com","url":"/XXXXXXX/haowanyihao/thumb.png","xff":"111.22.65.171","referer":"-","agent":"WeChat/6.6.0.32 CFNetwork/811.4.18 Darwin/16.5.0","status":"200"}
Step2. 编写python程式

# -- coding: utf-8 --'''  By Willson Luo at 2017/11/23 v1.0'''import pandas as pdimport json,time,datetime,iso8601from elasticsearch import Elasticsearchfrom geoip import geolite2# connect to elasticsearch databasees = Elasticsearch( "localhost:9200" )es = Elasticsearch(hosts=[{'host': 'localhost', 'port': '9200'}],httpauth=('elastic', 'xxxxx'))# nginx column name#title    = ['@timestamp','host','clientip','size','responsetime','upstreamtime','upstreamhost','httphost','url','xff','referer','agent','status']# nginx access logngxlog  = 'access.log'ngxdata = open(ngxlog).readlines()# nginx data(json format)ngxjson = {}for a1 in range(len(ngxdata)):    step1 = ngxdata[a1].strip().split("\"")    abc = iso8601.parsedate(step1[3])    bcd = abc.strftime('%Y-%m-%dT%H:%M:%S%Z')    cde = abc.strftime('%Y%m%d')    ngxindex   = 'logstash-weixin-nginx-access-'+ cde    ngxjson['@timestamp'] = bcd    ngxjson['host'] = step1[7]    ngxjson['size'] = step1[14].replace(":","").replace(",","")    ngxjson['responsetime'] = step1[16].replace(":","").replace(",","")    ngxjson['upstreamtime'] = step1[19]    ngxjson['upstreamhost'] = step1[23]    if step1[35] == "-":        ngxjson['clientip']  = step1[11]        ngxjson['httphost'] = step1[27]        ipaddr = step1[11]    else:        ngxjson['clientip']  = step1[35].split(",")[0]        ngxjson['httphost'] = step1[39]        ipaddr = step1[35].split(",")[0]    if "Apple" in step1[43]:        ngxjson['agent']="Apple"    elif "WeChat" in step1[43]:        ngxjson['agent']="WeChat"    elif "curl" in step1[43]:        ngxjson['agent']="Linux"    elif "Alibaba" in step1[43]:        ngxjson['agent']="Aliyun"    elif "Android" in step1[43]:        ngxjson['agent']="Android"    elif "MSIE" in step1[43]:        ngxjson['agent']="IE"    elif "Firefox" in step1[43]:        ngxjson['agent']="Firefox"    elif "Windows" in step1[43]:        ngxjson['agent']="Windows"    elif "Apache-Http" in step1[43]:        ngxjson['agent']="Apache"    else:        ngxjson['agent']= step1[43]    ngxjson['status']= step1[47]    location = geolite2.lookup(ipaddr).location    match = geolite2.lookup(ipaddr).getinfodict()    location = []    location.append(match['location']['longitude'])    location.append(match['location']['latitude'])    geoip = {}    geoip['location'] = location    if match.haskey('city'):        city = match['city']['names']['en']    else:        city = "-"    if match.haskey('country'):        country = match['country']['names']['en']    else:        country = "-"    if match.haskey('subdivisions'):        subdivisions = match['subdivisions'][0]['names']['en']    else:        subdivisions = "-"    ngxjson['geoip']        = geoip    ngxjson['country']      = country    ngxjson['subdivisions'] = subdivisions    ngxjson['city']         = city    ngxjson['possition']    = country+"-"+subdivisions+"-"+city    print a1,ngxjson    es.index( index=ngxindex, doctype="logs", body=ngxjson )

Step3. 通过Kibana把数据呈现处理
1> 先在Kibana把index汇入(一般第一步就让你建立这个东西了)Kibana-->Management-->Kibana(Index Patterns)

2> 构建可用视图Kibana--> Visualize(这个东西比较见人见智)

Step4. 构建Dashboard(就是把Visualize的内容拖进来)

第一次写Blog,估计错漏不少,麻烦指正,谢谢

0