千家信息网

如何在nginx中使用elasticsearch导入日志

发表于:2025-02-07 作者:千家信息网编辑
千家信息网最后更新 2025年02月07日,如何在nginx中使用elasticsearch导入日志?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。1、配置nginx日志格式log_
千家信息网最后更新 2025年02月07日如何在nginx中使用elasticsearch导入日志

如何在nginx中使用elasticsearch导入日志?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

1、配置nginx日志格式

log_format main    '$remote_addr $http_x_forwarded_for [$time_local] $server_name $request '             '$status $body_bytes_sent $http_referer '             '"$http_user_agent" '            '"$connection" '            '"$http_cookie" '            '$request_time '            '$upstream_response_time';

2、安装配置filebeat,启用nginx module

tar -zxvf filebeat-6.2.4-linux-x86_64.tar.gz -C /usr/localcd /usr/local;ln -s filebeat-6.2.4-linux-x86_64 filebeatcd /usr/local/filebeat

启用nginx模块

./filebeat modules enable nginx

查看模块

./filebeat modules list

创建配置文件

vim /usr/local/filebeat/blog_module_logstash.ymlfilebeat.modules:- module: nginx access:  enabled: true  var.paths: ["/home/weblog/blog.cnfol.com_access.log"] #error: # enabled: true # var.paths: ["/home/weblogerr/blog.cnfol.com_error.log"]output.logstash: hosts: ["192.168.15.91:5044"]

启动filebeat

./filebeat -c blog_module_logstash.yml -e

3、配置logstash

tar -zxvf logstash-6.2.4.tar.gz /usr/localcd /usr/local;ln -s logstash-6.2.4 logstash创建一个nginx日志的pipline文件cd /usr/local/logstash

logstash内置的模板目录

vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns

编辑 grok-patterns 添加一个支持多ip的正则

FORWORD (?:%{IPV4}[,]?[ ]?)+|%{WORD}

官方grok

http://grokdebug.herokuapp.com/patterns#

创建logstash pipline配置文件

#input {# stdin {}#}# 从filebeat接受数据input { beats { port => 5044 host => "0.0.0.0" }}filter { # 添加一个调试的开关 mutate{add_field => {"[@metadata][debug]"=>true}} grok { # 过滤nginx日志 #match => { "message" => "%{NGINXACCESS_TEST2}" } #match => { "message" => '%{IPORHOST:clientip} # (?[^\#]*) # \[%{HTTPDATE:[@metadata][webtime]}\] # %{NOTSPACE:hostname} # %{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} # %{NUMBER:response} # (?:%{NUMBER:bytes}|-) # (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) # (?:"(?[^#]*)") # (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) # (?:"(?[^#]*)") # %{NUMBER:request_time:float} # (?:%{NUMBER:upstream_response_time:float}|-)' } #match => { "message" => '(?:%{IPORHOST:clientip}|-) (?:%{TWO_IP:http_x_forwarded_for}|%{IPV4:http_x_forwarded_for}|-) \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) (?:"(?[^#]*)") %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' }    match => { "message" => '(?:%{IPORHOST:clientip}|-) %{FORWORD:http_x_forwarded_for} \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) %{QS:cookie} %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' } } # 将默认的@timestamp(beats收集日志的时间)的值赋值给新字段@read_tiimestamp ruby {  #code => "event.set('@read_timestamp',event.get('@timestamp'))" #将时区改为东8区 code => "event.set('@read_timestamp',event.get('@timestamp').time.localtime + 8*60*60)" } # 将nginx的日志记录时间格式化 # 格式化时间 20/May/2015:21:05:56 +0000 date { locale => "en" match => ["[@metadata][webtime]","dd/MMM/yyyy:HH:mm:ss Z"] } # 将bytes字段由字符串转换为数字 mutate { convert => {"bytes" => "integer"} } # 将cookie字段解析成一个json #mutate { # gsub => ["cookies",'\;',','] #}  # 如果有使用到cdn加速http_x_forwarded_for会有多个ip,第一个ip是用户真实ip if[http_x_forwarded_for] =~ ", "{     ruby {         code => 'event.set("http_x_forwarded_for", event.get("http_x_forwarded_for").split(",")[0])'        }    } # 解析ip,获得ip的地理位置 geoip { source => "http_x_forwarded_for" # # 只获取ip的经纬度、国家、城市、时区 fields => ["location","country_name","city_name","region_name"]  } # 将agent字段解析,获得浏览器、系统版本等具体信息 useragent { source => "agent" target => "useragent" } #指定要删除的数据 #mutate{remove_field=>["message"]} # 根据日志名设置索引名的前缀 ruby { code => 'event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])' }  # 将@timestamp 格式化为2019.04.23 ruby { code => 'event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%Y.%m.%d"))' } # 设置输出的默认索引名 mutate { add_field => {  #"[@metadata][index]" => "%{@[metadata][index_pre]}_%{+YYYY.MM.dd}"  "[@metadata][index]" => "%{@[metadata][index_pre]}_%{@[metadata][index_day]}" } } # 将cookies字段解析成json# mutate {# gsub => [#  "cookies", ";", ",",#  "cookies", "=", ":"# ]# #split => {"cookies" => ","}# }# json_encode {# source => "cookies"# target => "cookies_json"# }# mutate {# gsub => [#  "cookies_json", ',', '","',#  "cookies_json", ':', '":"'# ]# }# json {# source => "cookies_json"# target => "cookies2"# } # 如果grok解析存在错误,将错误独立写入一个索引 if "_grokparsefailure" in [tags] { #if "_dateparsefailure" in [tags] { mutate {  replace => {  #"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{+YYYY.MM.dd}"  "[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}"  } } # 如果不存在错误就删除message }else{ mutate{remove_field=>["message"]} }}output { if [@metadata][debug]{ # 输出到rubydebuyg并输出metadata stdout{codec => rubydebug{metadata => true}} }else{ # 将输出内容转换成 "." stdout{codec => dots}  # 将输出到指定的es elasticsearch {  hosts => ["192.168.15.160:9200"]  index => "%{[@metadata][index]}"  document_type => "doc" }  }}

启动logstash

nohup bin/logstash -f test_pipline2.conf &

看完上述内容,你们掌握如何在nginx中使用elasticsearch导入日志的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

0