千家信息网

Centos7.6如何部署ELK日志分析系统

发表于:2024-11-27 作者:千家信息网编辑
千家信息网最后更新 2024年11月27日,这篇文章将为大家详细讲解有关Centos7.6如何部署ELK日志分析系统,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。下载elasticsearch创建elk用户并
千家信息网最后更新 2024年11月27日Centos7.6如何部署ELK日志分析系统

这篇文章将为大家详细讲解有关Centos7.6如何部署ELK日志分析系统,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

下载elasticsearch

创建elk用户并授权

useradd elkchown -R elk:elk /home/elk/elasticsearchchown -R elk:elk /home/elk/elasticsearch2chown -R elk:elk /home/elk/elasticsearch3mkdir -p /home/eladatamkdir -p /var/log/elkchown -R elk:elk /home/eladatachown -R elk:elk /var/log/elk

主节点master

elasticsearch解压,修改配置文件

/home/elk/elasticsearch/config[root@localhost config]# grep -v  "^#" elasticsearch.yml cluster.name: my-applicationnode.name: node0node.master: truenode.attr.rack: r1node.max_local_storage_nodes: 3path.data: /home/eladatapath.logs: /var/log/elkhttp.cors.enabled: truehttp.cors.allow-origin: "*"network.host: 192.168.1.70http.port: 9200transport.tcp.port: 9301discovery.zen.minimum_master_nodes: 1cluster.initial_master_nodes: ["node0"]

手动启动命令

su elk -l -c '/home/elk/elasticsearch/bin/elasticsearch -d'

启动文件 elasticsearch.service

[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat elasticsearch.service [Unit]Description=ElasticsearchDocumentation=http://www.elastic.coWants=network-online.targetAfter=network-online.target[Service]RuntimeDirectory=elasticsearchPrivateTmp=trueEnvironment=ES_HOME=/home/elk/elasticsearchEnvironment=ES_PATH_CONF=/home/elk/elasticsearch/configEnvironment=PID_DIR=/var/run/elasticsearchEnvironmentFile=-/etc/sysconfig/elasticsearchWorkingDirectory=/home/elk/elasticsearchUser=elkGroup=elkExecStart=/home/elk/elasticsearch/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quietStandardOutput=journalStandardError=inheritLimitNOFILE=65536LimitNPROC=4096LimitAS=infinityLimitFSIZE=infinityTimeoutStopSec=0KillSignal=SIGTERMKillMode=processSendSIGKILL=noSuccessExitStatus=143[Install]WantedBy=multi-user.target[root@localhost system]#

Node1节点

/home/elk/elasticsearch2/config[root@localhost config]# grep -v  "^#" elasticsearch.yml cluster.name: my-applicationnode.name: node1node.master: falsenode.attr.rack: r1node.max_local_storage_nodes: 3path.data: /home/eladatapath.logs: /var/log/elkhttp.cors.enabled: truehttp.cors.allow-origin: "*"network.host: 192.168.1.70transport.tcp.port: 9303http.port: 9302discovery.zen.ping.unicast.hosts: ["192.168.1.70:9301"][root@localhost config]#

手动启动命令

su elk -l -c '/home/elk/elasticsearch2/bin/elasticsearch2 -d'

启动文件 elasticsearch2.service

[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat elasticsearch2.service [Unit]Description=ElasticsearchDocumentation=http://www.elastic.coWants=network-online.targetAfter=network-online.target[Service]RuntimeDirectory=elasticsearch2PrivateTmp=trueEnvironment=ES_HOME=/home/elk/elasticsearch2Environment=ES_PATH_CONF=/home/elk/elasticsearch2/configEnvironment=PID_DIR=/var/run/elasticsearchEnvironmentFile=-/etc/sysconfig/elasticsearchWorkingDirectory=/home/elk/elasticsearchUser=elkGroup=elkExecStart=/home/elk/elasticsearch2/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quietStandardOutput=journalStandardError=inheritLimitNOFILE=65536LimitNPROC=4096LimitAS=infinityLimitFSIZE=infinityTimeoutStopSec=0KillSignal=SIGTERMKillMode=processSendSIGKILL=noSuccessExitStatus=143[Install]WantedBy=multi-user.target[root@localhost system]#

Node2节点

/home/elk/elasticsearch3/config[root@localhost config]# grep -v  "^#" elasticsearch.yml cluster.name: my-applicationnode.name: node2node.attr.rack: r1node.master: falsenode.max_local_storage_nodes: 3path.data: /home/eladatapath.logs: /var/log/elkhttp.cors.enabled: truehttp.cors.allow-origin: "*"network.host: 192.168.1.70http.port: 9203transport.tcp.port: 9304discovery.zen.ping.unicast.hosts: ["192.168.1.70:9301"]discovery.zen.minimum_master_nodes: 1[root@localhost config]#

手动启动命令

su elk -l -c '/home/elk/elasticsearch3/bin/elasticsearch3 -d'

启动文件 elasticsearch3.service

[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat elasticsearch3.service [Unit]Description=ElasticsearchDocumentation=http://www.elastic.coWants=network-online.targetAfter=network-online.target[Service]RuntimeDirectory=elasticsearch3PrivateTmp=trueEnvironment=ES_HOME=/home/elk/elasticsearch3Environment=ES_PATH_CONF=/home/elk/elasticsearch3/configEnvironment=PID_DIR=/var/run/elasticsearchEnvironmentFile=-/etc/sysconfig/elasticsearchWorkingDirectory=/home/elk/elasticsearch3User=elkGroup=elkExecStart=/home/elk/elasticsearch3/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quietStandardOutput=journalStandardError=inheritLimitNOFILE=65536LimitNPROC=4096LimitAS=infinityLimitFSIZE=infinityTimeoutStopSec=0KillSignal=SIGTERMKillMode=processSendSIGKILL=noSuccessExitStatus=143[Install]WantedBy=multi-user.target[root@localhost system]#

下载logstash

目录如下,默认配置即可

[root@localhost logstash]# pwd/home/elk/logstash[root@localhost logstash]#

手动启动命令

./logstash -f ../dev.conf nohup ./logstash -f ../dev.conf &

下载kibana

配置文件如下

[root@localhost config]# pwd/home/elk/kibana/config[root@localhost config]# grep -v  "^#" kibana.yml server.host: "192.168.1.70"elasticsearch.hosts: ["http://192.168.1.70:9200"]kibana.index: ".kibana"i18n.locale: "zh-CN"

手动启动命令

./kibananohup ./kibana &

kibana启动文件

[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat kibana.service [Unit]Description=Kibana  Server Manager[Service]ExecStart=/home/elk/kibana/bin/kibana[Install]WantedBy=multi-user.target[root@localhost system]#
端口为:5601 访问:192.168.1.70:5601

安装Elasticsearch -head

yum install git npmgit clone https://github.com/mobz/elasticsearch-head.git [root@localhost elasticsearch-head]# pwd/home/elk/elasticsearch-head[root@localhost elasticsearch-head]#

启动

npm install npm run startnohup npm run start &
curl -XPUT '192.168.2.67:9100/book'
访问192.168.2.67:9100 即可访问

下载kafka

修改配置文件如下

[root@localhost config]# pwd/home/elk/kafka/config[root@localhost config]# grep -v "^#" server.properties broker.id=0listeners=PLAINTEXT://192.168.1.70:9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/var/log/kafka-logsnum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=localhost:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0delete.topic.enable=true[root@localhost config]#
kafka配置启动zookeeper

手动启动方式

[root@localhost bin]# pwd/home/elk/kafka/bin[root@localhost bin]#./zookeeper-server-start.sh ../config/zookeeper.properties

systemctl 启动zookeeper

[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat zookeeper.service [Service]Type=forkingSyslogIdentifier=zookeeperRestart=alwaysRestartSec=0sExecStart=/home/elk/kafka/bin/zookeeper-server-start.sh -daemon /home/elk/kafka/config/zookeeper.propertiesExecStop=/home/elk/kafka/bin/zookeeper-server-stop.sh[root@localhost system]#
启动kafka服务

手动启动方式

./kafka-server-start.sh ../config/server.properties

systemctl 启动kafka

[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat kafka.service [Unit]Description=Apache kafkaAfter=network.target[Service]Type=simpleRestart=alwaysRestartSec=0sExecStart=/home/elk/kafka/bin/kafka-server-start.sh  /home/elk/kafka/config/server.propertiesExecStop=/home/elk/kafka/bin/kafka-server-stop.sh[root@localhost system]#
测试kafka

新建一个名字为test的topic

/kafka-topics.sh --create --zookeeper 192.168.1.70:2181 --replication-factor 1 --partitions 1 --topic test

查看kafka中的topic

./kafka-topics.sh --list  --zookeeper 192.168.1.70:2181

往kafka topic为test中 生产消息

./kafka-console-producer.sh --broker-list 192.168.1.70:9092 --topic test

在kafka topic为test中 消费消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.70:9092 --topic test --from-beginning

生产的消息,消费那边接受到即是ok的

目标机器安装filebeat

安装6.5版本的

[root@localhost filebeat]# pwd/usr/local/filebeat[root@localhost filebeat]# cat filebeat.yml filebeat.prospectors:- type: log  paths:    - /opt/logs/workphone-tcp/catalina.out  fields:     tag: 54_tcp_catalina_out- type: log  paths:    - /opt/logs/workphone-webservice/catalina.out  fields:     tag: 54_web_catalina_outname: 192.168.1.54filebeat.config.modules:  path: ${path.config}/modules.d/*.yml  reload.enabled: falsesetup.template.settings:  index.number_of_shards: 3output.kafka:  hosts: ["192.168.1.70:9092"]  topic: "filebeat-log"  partition.hash:    reachable_only: true  compression: gzip  max_message_bytes: 1000000  required_acks: 1[root@localhost filebeat]#

安装完成后去logstash编辑配置文件

logstash操作

[root@localhost logstash]# pwd/home/elk/logstash[root@localhost logstash]# cat dev.conf input {  kafka{    bootstrap_servers => "192.168.1.70:9092"    topics => ["filebeat-log"]    codec => "json"  }}filter {        if [fields][tag]=="jpwebmap" {                json{                source => "message"                remove_field => "message"                }                geoip {                source => "client"                target => "geoip"           add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]           add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]                }           mutate {                convert => [ "[geoip][coordinates]", "float"]                }        }        if [fields][tag] == "54_tcp_catalina_out"{                    grok {                        match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]                    }                    date {                        match => ["logdate", "ISO8601"]                    }                    mutate {                   remove_field => [ "logdate" ]                    }      }        if [fields][tag] == "54_web_catalina_out"{                grok {                        match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]                }                date {                        match => ["logdate", "ISO8601"]                }                mutate {                        remove_field => [ "logdate" ]                }        }        if [fields][tag] == "55_tcp_catalina_out"{                grok {                        match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]                }                date {                        match => ["logdate", "ISO8601"]                }                mutate {                        remove_field => [ "logdate" ]                }        }        if [fields][tag] == "55_web_catalina_out"{                grok {                        match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]                }                date {                        match => ["logdate", "ISO8601"]                }                mutate {                        remove_field => [ "logdate" ]                }        }        if [fields][tag] == "51_nginx80_access_log" {                mutate {                        add_field => { "spstr" => "%{[log][file][path]}" }                }                     mutate {                        split => ["spstr" , "/"]                        # save the last element of the array as the api_method.                        add_field => ["src", "%{[spstr][-1]}" ]                }                mutate{                        remove_field => [ "friends", "ecs", "agent" , "spstr" ]                }                    grok {                        match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"%{DATA:x_forwarded_for}\" \"%{NUMBER:request_time}\" \"%{DATA:upstream_addr}\" \"%{DATA:upstream_status}\"" }                        remove_field => "message"                    }                    date {                        match => ["time", "dd/MMM/yyyy:HH:mm:ss Z"]                        target => "@timestamp"                }                    geoip {                        source => "x_forwarded_for"                        target => "geoip"                        database => "/home/elk/logstash/GeoLite2-City.mmdb"                        add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]                        add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]                }                    mutate {                        convert => [ "[geoip][coordinates]", "float"]                    }      }}output {if [fields][tag] == "wori"{  elasticsearch {   hosts => ["192.168.1.70:9200"]   index => "zabbix"       }   }if [fields][tag] == "54_tcp_catalina_out"{  elasticsearch {   hosts => ["192.168.1.70:9200"]   index => "54_tcp_catalina_out"       }    }if [fields][tag] == "54_web_catalina_out"{  elasticsearch {   hosts => ["192.168.1.70:9200"]   index => "54_web_catalina_out"       }    }if [fields][tag] == "55_tcp_catalina_out"{  elasticsearch {   hosts => ["192.168.1.70:9200"]   index => "55_tcp_catalina_out"       }    }   if [fields][tag] == "55_web_catalina_out"{  elasticsearch {   hosts => ["192.168.1.70:9200"]   index => "55_web_catalina_out"       }    }if [fields][tag] == "51_nginx80_access_log" {       stdout{}      elasticsearch {         hosts => ["192.168.1.70:9200"]         index => "51_nginx80_access_log"         }   }}

其他的配置文件

index.conf

filter {        mutate {        add_field => { "spstr" => "%{[log][file][path]}" }        }        mutate {        split => ["spstr" , "/"]        # save the last element of the array as the api_method.        add_field => ["src", "%{[spstr][-1]}" ]        }        mutate{        remove_field => [ "friends", "ecs", "agent" , "spstr" ]        }}

java.conf

filter {if [fields][tag] == "java"{    grok {        match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]    }    date {        match => ["logdate", "ISO8601"]    }    mutate {         remove_field => [ "logdate" ]    }  } #End if}

kafkainput.conf

input {  kafka{    bootstrap_servers => "172.16.11.68:9092"    #topics => ["ql-prod-tomcat" ]    topics => ["ql-prod-dubbo","ql-prod-nginx","ql-prod-tomcat" ]    codec => "json"    consumer_threads => 5    decorate_events => true    #auto_offset_reset => "latest"    group_id => "logstash"    #client_id => ""    ############################# HELK Optimizing Latency #############################    fetch_min_bytes => "1"    request_timeout_ms => "305000"    ############################# HELK Optimizing Availability #############################    session_timeout_ms => "10000"    max_poll_records => "550"    max_poll_interval_ms => "300000"  }}#input {#  kafka{#    bootstrap_servers => "172.16.11.68:9092"#    topics => ["ql-prod-java-dubbo","ql-prod","ql-prod-java" ]#    codec => "json"#    consumer_threads => 15#    decorate_events => true#    auto_offset_reset => "latest"#    group_id => "logstash-1"#    ############################# HELK Optimizing Latency ##############################    fetch_min_bytes => "1"#    request_timeout_ms => "305000"#    ############################# HELK Optimizing Availability ##############################    session_timeout_ms => "10000"#    max_poll_records => "550"#    max_poll_interval_ms => "300000"#  }#}

nginx.conf

filter {if [fields][tag] == "nginx-access" {        mutate {        add_field => { "spstr" => "%{[log][file][path]}" }        }        mutate {        split => ["spstr" , "/"]        # save the last element of the array as the api_method.        add_field => ["src", "%{[spstr][-1]}" ]        }        mutate{        remove_field => [ "friends", "ecs", "agent" , "spstr" ]        }    grok {        match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"%{DATA:x_forwarded_for}\" \"%{NUMBER:request_time}\" \"%{DATA:upstream_addr}\" \"%{DATA:upstream_status}\"" }        remove_field => "message"    }    date {                match => ["time", "dd/MMM/yyyy:HH:mm:ss Z"]                target => "@timestamp"        }    geoip {        source => "x_forwarded_for"        target => "geoip"        database => "/opt/logstash-6.2.4/GeoLite2-City.mmdb"        add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]        add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]        }    mutate {        convert => [ "[geoip][coordinates]", "float"]    }  } #endif}

ouput.conf

output{  if [fields][tag] == "nginx-access" {       stdout{}      elasticsearch {         user => elastic         password => WR141bp2sveJuGFaD4oR         hosts => ["172.16.11.67:9200"]         index => "logstash-%{[fields][proname]}-%{+YYYY.MM.dd}"         }   }       #stdout{}   if [fields][tag] == "java" {        elasticsearch {        user => elastic        password => WR141bp2sveJuGFaD4oR        hosts => ["172.16.11.66:9200","172.16.11.68:9200"]        index => "%{[host][name]}-%{[src]}"        }  }}

关于"Centos7.6如何部署ELK日志分析系统"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

0