Python重写Logstash,把Nginx Access Log清洗后汇入Elastic DB
Step1. 修改Nginx的log格式(改为JSON格式)
把nginx的log_format改为以下的参数(修改/etc/nginx/nginx.conf):
log_format main '{"@timestamp":"$time_iso8601","host":"$server_addr","clientip":"$remote_addr","size":$body_bytes_sent,"responsetime":$request_time,"upstreamtime":"$upstream_response_time","upstreamhost":"$upstream_addr","http_host":"$host","url":"$uri","xff":"$http_x_forwarded_for","referer":"$http_referer","agent":"$http_user_agent","status":"$status"}';
reload nginx后,看到access.log的格式如下:
{"@timestamp":"2017-12-13T17:29:49+08:00","host":"120.76.XX.XX","clientip":"120.76.XX.XXX","size":26963,"responsetime":0.000,"upstreamtime":"0.000","upstreamhost":"127.0.0.1:8080","http_host":"weixin.XXX.com","url":"/XXXXXXX/haowanyihao/thumb.png","xff":"111.22.65.171","referer":"-","agent":"WeChat/6.6.0.32 CFNetwork/811.4.18 Darwin/16.5.0","status":"200"}
Step2. 编写python程式
# -- coding: utf-8 --''' By Willson Luo at 2017/11/23 v1.0'''import pandas as pdimport json,time,datetime,iso8601from elasticsearch import Elasticsearchfrom geoip import geolite2# connect to elasticsearch databasees = Elasticsearch( "localhost:9200" )es = Elasticsearch(hosts=[{'host': 'localhost', 'port': '9200'}],httpauth=('elastic', 'xxxxx'))# nginx column name#title = ['@timestamp','host','clientip','size','responsetime','upstreamtime','upstreamhost','httphost','url','xff','referer','agent','status']# nginx access logngxlog = 'access.log'ngxdata = open(ngxlog).readlines()# nginx data(json format)ngxjson = {}for a1 in range(len(ngxdata)): step1 = ngxdata[a1].strip().split("\"") abc = iso8601.parsedate(step1[3]) bcd = abc.strftime('%Y-%m-%dT%H:%M:%S%Z') cde = abc.strftime('%Y%m%d') ngxindex = 'logstash-weixin-nginx-access-'+ cde ngxjson['@timestamp'] = bcd ngxjson['host'] = step1[7] ngxjson['size'] = step1[14].replace(":","").replace(",","") ngxjson['responsetime'] = step1[16].replace(":","").replace(",","") ngxjson['upstreamtime'] = step1[19] ngxjson['upstreamhost'] = step1[23] if step1[35] == "-": ngxjson['clientip'] = step1[11] ngxjson['httphost'] = step1[27] ipaddr = step1[11] else: ngxjson['clientip'] = step1[35].split(",")[0] ngxjson['httphost'] = step1[39] ipaddr = step1[35].split(",")[0] if "Apple" in step1[43]: ngxjson['agent']="Apple" elif "WeChat" in step1[43]: ngxjson['agent']="WeChat" elif "curl" in step1[43]: ngxjson['agent']="Linux" elif "Alibaba" in step1[43]: ngxjson['agent']="Aliyun" elif "Android" in step1[43]: ngxjson['agent']="Android" elif "MSIE" in step1[43]: ngxjson['agent']="IE" elif "Firefox" in step1[43]: ngxjson['agent']="Firefox" elif "Windows" in step1[43]: ngxjson['agent']="Windows" elif "Apache-Http" in step1[43]: ngxjson['agent']="Apache" else: ngxjson['agent']= step1[43] ngxjson['status']= step1[47] location = geolite2.lookup(ipaddr).location match = geolite2.lookup(ipaddr).getinfodict() location = [] location.append(match['location']['longitude']) location.append(match['location']['latitude']) geoip = {} geoip['location'] = location if match.haskey('city'): city = match['city']['names']['en'] else: city = "-" if match.haskey('country'): country = match['country']['names']['en'] else: country = "-" if match.haskey('subdivisions'): subdivisions = match['subdivisions'][0]['names']['en'] else: subdivisions = "-" ngxjson['geoip'] = geoip ngxjson['country'] = country ngxjson['subdivisions'] = subdivisions ngxjson['city'] = city ngxjson['possition'] = country+"-"+subdivisions+"-"+city print a1,ngxjson es.index( index=ngxindex, doctype="logs", body=ngxjson )
Step3. 通过Kibana把数据呈现处理
1> 先在Kibana把index汇入(一般第一步就让你建立这个东西了)Kibana-->Management-->Kibana(Index Patterns)
2> 构建可用视图Kibana--> Visualize(这个东西比较见人见智)
Step4. 构建Dashboard(就是把Visualize的内容拖进来)
第一次写Blog,估计错漏不少,麻烦指正,谢谢