Centos7.6如何部署ELK日志分析系统
发表于:2024-11-27 作者:千家信息网编辑
千家信息网最后更新 2024年11月27日,这篇文章将为大家详细讲解有关Centos7.6如何部署ELK日志分析系统,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。下载elasticsearch创建elk用户并
千家信息网最后更新 2024年11月27日Centos7.6如何部署ELK日志分析系统
这篇文章将为大家详细讲解有关Centos7.6如何部署ELK日志分析系统,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
下载elasticsearch
创建elk用户并授权
useradd elkchown -R elk:elk /home/elk/elasticsearchchown -R elk:elk /home/elk/elasticsearch2chown -R elk:elk /home/elk/elasticsearch3mkdir -p /home/eladatamkdir -p /var/log/elkchown -R elk:elk /home/eladatachown -R elk:elk /var/log/elk
主节点master
elasticsearch解压,修改配置文件
/home/elk/elasticsearch/config[root@localhost config]# grep -v "^#" elasticsearch.yml cluster.name: my-applicationnode.name: node0node.master: truenode.attr.rack: r1node.max_local_storage_nodes: 3path.data: /home/eladatapath.logs: /var/log/elkhttp.cors.enabled: truehttp.cors.allow-origin: "*"network.host: 192.168.1.70http.port: 9200transport.tcp.port: 9301discovery.zen.minimum_master_nodes: 1cluster.initial_master_nodes: ["node0"]
手动启动命令
su elk -l -c '/home/elk/elasticsearch/bin/elasticsearch -d'
启动文件 elasticsearch.service
[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat elasticsearch.service [Unit]Description=ElasticsearchDocumentation=http://www.elastic.coWants=network-online.targetAfter=network-online.target[Service]RuntimeDirectory=elasticsearchPrivateTmp=trueEnvironment=ES_HOME=/home/elk/elasticsearchEnvironment=ES_PATH_CONF=/home/elk/elasticsearch/configEnvironment=PID_DIR=/var/run/elasticsearchEnvironmentFile=-/etc/sysconfig/elasticsearchWorkingDirectory=/home/elk/elasticsearchUser=elkGroup=elkExecStart=/home/elk/elasticsearch/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quietStandardOutput=journalStandardError=inheritLimitNOFILE=65536LimitNPROC=4096LimitAS=infinityLimitFSIZE=infinityTimeoutStopSec=0KillSignal=SIGTERMKillMode=processSendSIGKILL=noSuccessExitStatus=143[Install]WantedBy=multi-user.target[root@localhost system]#
Node1节点
/home/elk/elasticsearch2/config[root@localhost config]# grep -v "^#" elasticsearch.yml cluster.name: my-applicationnode.name: node1node.master: falsenode.attr.rack: r1node.max_local_storage_nodes: 3path.data: /home/eladatapath.logs: /var/log/elkhttp.cors.enabled: truehttp.cors.allow-origin: "*"network.host: 192.168.1.70transport.tcp.port: 9303http.port: 9302discovery.zen.ping.unicast.hosts: ["192.168.1.70:9301"][root@localhost config]#
手动启动命令
su elk -l -c '/home/elk/elasticsearch2/bin/elasticsearch2 -d'
启动文件 elasticsearch2.service
[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat elasticsearch2.service [Unit]Description=ElasticsearchDocumentation=http://www.elastic.coWants=network-online.targetAfter=network-online.target[Service]RuntimeDirectory=elasticsearch2PrivateTmp=trueEnvironment=ES_HOME=/home/elk/elasticsearch2Environment=ES_PATH_CONF=/home/elk/elasticsearch2/configEnvironment=PID_DIR=/var/run/elasticsearchEnvironmentFile=-/etc/sysconfig/elasticsearchWorkingDirectory=/home/elk/elasticsearchUser=elkGroup=elkExecStart=/home/elk/elasticsearch2/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quietStandardOutput=journalStandardError=inheritLimitNOFILE=65536LimitNPROC=4096LimitAS=infinityLimitFSIZE=infinityTimeoutStopSec=0KillSignal=SIGTERMKillMode=processSendSIGKILL=noSuccessExitStatus=143[Install]WantedBy=multi-user.target[root@localhost system]#
Node2节点
/home/elk/elasticsearch3/config[root@localhost config]# grep -v "^#" elasticsearch.yml cluster.name: my-applicationnode.name: node2node.attr.rack: r1node.master: falsenode.max_local_storage_nodes: 3path.data: /home/eladatapath.logs: /var/log/elkhttp.cors.enabled: truehttp.cors.allow-origin: "*"network.host: 192.168.1.70http.port: 9203transport.tcp.port: 9304discovery.zen.ping.unicast.hosts: ["192.168.1.70:9301"]discovery.zen.minimum_master_nodes: 1[root@localhost config]#
手动启动命令
su elk -l -c '/home/elk/elasticsearch3/bin/elasticsearch3 -d'
启动文件 elasticsearch3.service
[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat elasticsearch3.service [Unit]Description=ElasticsearchDocumentation=http://www.elastic.coWants=network-online.targetAfter=network-online.target[Service]RuntimeDirectory=elasticsearch3PrivateTmp=trueEnvironment=ES_HOME=/home/elk/elasticsearch3Environment=ES_PATH_CONF=/home/elk/elasticsearch3/configEnvironment=PID_DIR=/var/run/elasticsearchEnvironmentFile=-/etc/sysconfig/elasticsearchWorkingDirectory=/home/elk/elasticsearch3User=elkGroup=elkExecStart=/home/elk/elasticsearch3/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quietStandardOutput=journalStandardError=inheritLimitNOFILE=65536LimitNPROC=4096LimitAS=infinityLimitFSIZE=infinityTimeoutStopSec=0KillSignal=SIGTERMKillMode=processSendSIGKILL=noSuccessExitStatus=143[Install]WantedBy=multi-user.target[root@localhost system]#
下载logstash
目录如下,默认配置即可
[root@localhost logstash]# pwd/home/elk/logstash[root@localhost logstash]#
手动启动命令
./logstash -f ../dev.conf nohup ./logstash -f ../dev.conf &
下载kibana
配置文件如下
[root@localhost config]# pwd/home/elk/kibana/config[root@localhost config]# grep -v "^#" kibana.yml server.host: "192.168.1.70"elasticsearch.hosts: ["http://192.168.1.70:9200"]kibana.index: ".kibana"i18n.locale: "zh-CN"
手动启动命令
./kibananohup ./kibana &
kibana启动文件
[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat kibana.service [Unit]Description=Kibana Server Manager[Service]ExecStart=/home/elk/kibana/bin/kibana[Install]WantedBy=multi-user.target[root@localhost system]#
端口为:5601 访问:192.168.1.70:5601
安装Elasticsearch -head
yum install git npmgit clone https://github.com/mobz/elasticsearch-head.git [root@localhost elasticsearch-head]# pwd/home/elk/elasticsearch-head[root@localhost elasticsearch-head]#
启动
npm install npm run startnohup npm run start &
curl -XPUT '192.168.2.67:9100/book'
访问192.168.2.67:9100 即可访问
下载kafka
修改配置文件如下
[root@localhost config]# pwd/home/elk/kafka/config[root@localhost config]# grep -v "^#" server.properties broker.id=0listeners=PLAINTEXT://192.168.1.70:9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/var/log/kafka-logsnum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=localhost:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0delete.topic.enable=true[root@localhost config]#
kafka配置启动zookeeper
手动启动方式
[root@localhost bin]# pwd/home/elk/kafka/bin[root@localhost bin]#./zookeeper-server-start.sh ../config/zookeeper.properties
systemctl 启动zookeeper
[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat zookeeper.service [Service]Type=forkingSyslogIdentifier=zookeeperRestart=alwaysRestartSec=0sExecStart=/home/elk/kafka/bin/zookeeper-server-start.sh -daemon /home/elk/kafka/config/zookeeper.propertiesExecStop=/home/elk/kafka/bin/zookeeper-server-stop.sh[root@localhost system]#
启动kafka服务
手动启动方式
./kafka-server-start.sh ../config/server.properties
systemctl 启动kafka
[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat kafka.service [Unit]Description=Apache kafkaAfter=network.target[Service]Type=simpleRestart=alwaysRestartSec=0sExecStart=/home/elk/kafka/bin/kafka-server-start.sh /home/elk/kafka/config/server.propertiesExecStop=/home/elk/kafka/bin/kafka-server-stop.sh[root@localhost system]#
测试kafka
新建一个名字为test的topic
/kafka-topics.sh --create --zookeeper 192.168.1.70:2181 --replication-factor 1 --partitions 1 --topic test
查看kafka中的topic
./kafka-topics.sh --list --zookeeper 192.168.1.70:2181
往kafka topic为test中 生产消息
./kafka-console-producer.sh --broker-list 192.168.1.70:9092 --topic test
在kafka topic为test中 消费消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.70:9092 --topic test --from-beginning
生产的消息,消费那边接受到即是ok的
目标机器安装filebeat
安装6.5版本的
[root@localhost filebeat]# pwd/usr/local/filebeat[root@localhost filebeat]# cat filebeat.yml filebeat.prospectors:- type: log paths: - /opt/logs/workphone-tcp/catalina.out fields: tag: 54_tcp_catalina_out- type: log paths: - /opt/logs/workphone-webservice/catalina.out fields: tag: 54_web_catalina_outname: 192.168.1.54filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: falsesetup.template.settings: index.number_of_shards: 3output.kafka: hosts: ["192.168.1.70:9092"] topic: "filebeat-log" partition.hash: reachable_only: true compression: gzip max_message_bytes: 1000000 required_acks: 1[root@localhost filebeat]#
安装完成后去logstash编辑配置文件
logstash操作
[root@localhost logstash]# pwd/home/elk/logstash[root@localhost logstash]# cat dev.conf input { kafka{ bootstrap_servers => "192.168.1.70:9092" topics => ["filebeat-log"] codec => "json" }}filter { if [fields][tag]=="jpwebmap" { json{ source => "message" remove_field => "message" } geoip { source => "client" target => "geoip" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float"] } } if [fields][tag] == "54_tcp_catalina_out"{ grok { match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] } date { match => ["logdate", "ISO8601"] } mutate { remove_field => [ "logdate" ] } } if [fields][tag] == "54_web_catalina_out"{ grok { match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] } date { match => ["logdate", "ISO8601"] } mutate { remove_field => [ "logdate" ] } } if [fields][tag] == "55_tcp_catalina_out"{ grok { match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] } date { match => ["logdate", "ISO8601"] } mutate { remove_field => [ "logdate" ] } } if [fields][tag] == "55_web_catalina_out"{ grok { match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] } date { match => ["logdate", "ISO8601"] } mutate { remove_field => [ "logdate" ] } } if [fields][tag] == "51_nginx80_access_log" { mutate { add_field => { "spstr" => "%{[log][file][path]}" } } mutate { split => ["spstr" , "/"] # save the last element of the array as the api_method. add_field => ["src", "%{[spstr][-1]}" ] } mutate{ remove_field => [ "friends", "ecs", "agent" , "spstr" ] } grok { match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"%{DATA:x_forwarded_for}\" \"%{NUMBER:request_time}\" \"%{DATA:upstream_addr}\" \"%{DATA:upstream_status}\"" } remove_field => "message" } date { match => ["time", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } geoip { source => "x_forwarded_for" target => "geoip" database => "/home/elk/logstash/GeoLite2-City.mmdb" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float"] } }}output {if [fields][tag] == "wori"{ elasticsearch { hosts => ["192.168.1.70:9200"] index => "zabbix" } }if [fields][tag] == "54_tcp_catalina_out"{ elasticsearch { hosts => ["192.168.1.70:9200"] index => "54_tcp_catalina_out" } }if [fields][tag] == "54_web_catalina_out"{ elasticsearch { hosts => ["192.168.1.70:9200"] index => "54_web_catalina_out" } }if [fields][tag] == "55_tcp_catalina_out"{ elasticsearch { hosts => ["192.168.1.70:9200"] index => "55_tcp_catalina_out" } } if [fields][tag] == "55_web_catalina_out"{ elasticsearch { hosts => ["192.168.1.70:9200"] index => "55_web_catalina_out" } }if [fields][tag] == "51_nginx80_access_log" { stdout{} elasticsearch { hosts => ["192.168.1.70:9200"] index => "51_nginx80_access_log" } }}
其他的配置文件
index.conf
filter { mutate { add_field => { "spstr" => "%{[log][file][path]}" } } mutate { split => ["spstr" , "/"] # save the last element of the array as the api_method. add_field => ["src", "%{[spstr][-1]}" ] } mutate{ remove_field => [ "friends", "ecs", "agent" , "spstr" ] }}
java.conf
filter {if [fields][tag] == "java"{ grok { match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] } date { match => ["logdate", "ISO8601"] } mutate { remove_field => [ "logdate" ] } } #End if}
kafkainput.conf
input { kafka{ bootstrap_servers => "172.16.11.68:9092" #topics => ["ql-prod-tomcat" ] topics => ["ql-prod-dubbo","ql-prod-nginx","ql-prod-tomcat" ] codec => "json" consumer_threads => 5 decorate_events => true #auto_offset_reset => "latest" group_id => "logstash" #client_id => "" ############################# HELK Optimizing Latency ############################# fetch_min_bytes => "1" request_timeout_ms => "305000" ############################# HELK Optimizing Availability ############################# session_timeout_ms => "10000" max_poll_records => "550" max_poll_interval_ms => "300000" }}#input {# kafka{# bootstrap_servers => "172.16.11.68:9092"# topics => ["ql-prod-java-dubbo","ql-prod","ql-prod-java" ]# codec => "json"# consumer_threads => 15# decorate_events => true# auto_offset_reset => "latest"# group_id => "logstash-1"# ############################# HELK Optimizing Latency ############################## fetch_min_bytes => "1"# request_timeout_ms => "305000"# ############################# HELK Optimizing Availability ############################## session_timeout_ms => "10000"# max_poll_records => "550"# max_poll_interval_ms => "300000"# }#}
nginx.conf
filter {if [fields][tag] == "nginx-access" { mutate { add_field => { "spstr" => "%{[log][file][path]}" } } mutate { split => ["spstr" , "/"] # save the last element of the array as the api_method. add_field => ["src", "%{[spstr][-1]}" ] } mutate{ remove_field => [ "friends", "ecs", "agent" , "spstr" ] } grok { match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"%{DATA:x_forwarded_for}\" \"%{NUMBER:request_time}\" \"%{DATA:upstream_addr}\" \"%{DATA:upstream_status}\"" } remove_field => "message" } date { match => ["time", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } geoip { source => "x_forwarded_for" target => "geoip" database => "/opt/logstash-6.2.4/GeoLite2-City.mmdb" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float"] } } #endif}
ouput.conf
output{ if [fields][tag] == "nginx-access" { stdout{} elasticsearch { user => elastic password => WR141bp2sveJuGFaD4oR hosts => ["172.16.11.67:9200"] index => "logstash-%{[fields][proname]}-%{+YYYY.MM.dd}" } } #stdout{} if [fields][tag] == "java" { elasticsearch { user => elastic password => WR141bp2sveJuGFaD4oR hosts => ["172.16.11.66:9200","172.16.11.68:9200"] index => "%{[host][name]}-%{[src]}" } }}
关于"Centos7.6如何部署ELK日志分析系统"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
文件
手动
配置
命令
消息
篇文章
节点
日志
系统
分析
方式
更多
消费
生产
不错
实用
内容
名字
文章
机器
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库数据怎么设置自加增加
java软件开发教程培训
网络技术等级教育
数据库结构的正确顺序
时间戳服务器地址
互联网或者高科技的英语
取消服务器管理器开机启动项
网络安全文明校园宣传标语
网络安全事件是突发事件吗
上海联诚网络技术有限公司
营业执照 软件开发照片
有巢氏数据库
重庆专业的服务器租用云空间
网络安全技术第四版答案
嘉兴瑞尚网络技术
方舟维姆服务器怎么加mod
让软件开发变得更简单
sql是个什么数据库系统
基础数据库是指
及时整改网络安全风险和漏洞隐患
互联网时代科技企业
软件开发蓝图方法
打印机的rpc服务器是什么
阳泉市日旭软件开发有限公司
软件开发 分享什么
浪潮和联想华为服务器
软件开发作战图
esb 与数据库
方舟服务器招管理员
实时数据库系统生产