Centos7.6如何部署ELK日志分析系统
发表于:2025-02-11 作者:千家信息网编辑
千家信息网最后更新 2025年02月11日,这篇文章将为大家详细讲解有关Centos7.6如何部署ELK日志分析系统,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。下载elasticsearch创建elk用户并
千家信息网最后更新 2025年02月11日Centos7.6如何部署ELK日志分析系统
这篇文章将为大家详细讲解有关Centos7.6如何部署ELK日志分析系统,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
下载elasticsearch
创建elk用户并授权
useradd elkchown -R elk:elk /home/elk/elasticsearchchown -R elk:elk /home/elk/elasticsearch2chown -R elk:elk /home/elk/elasticsearch3mkdir -p /home/eladatamkdir -p /var/log/elkchown -R elk:elk /home/eladatachown -R elk:elk /var/log/elk
主节点master
elasticsearch解压,修改配置文件
/home/elk/elasticsearch/config[root@localhost config]# grep -v "^#" elasticsearch.yml cluster.name: my-applicationnode.name: node0node.master: truenode.attr.rack: r1node.max_local_storage_nodes: 3path.data: /home/eladatapath.logs: /var/log/elkhttp.cors.enabled: truehttp.cors.allow-origin: "*"network.host: 192.168.1.70http.port: 9200transport.tcp.port: 9301discovery.zen.minimum_master_nodes: 1cluster.initial_master_nodes: ["node0"]
手动启动命令
su elk -l -c '/home/elk/elasticsearch/bin/elasticsearch -d'
启动文件 elasticsearch.service
[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat elasticsearch.service [Unit]Description=ElasticsearchDocumentation=http://www.elastic.coWants=network-online.targetAfter=network-online.target[Service]RuntimeDirectory=elasticsearchPrivateTmp=trueEnvironment=ES_HOME=/home/elk/elasticsearchEnvironment=ES_PATH_CONF=/home/elk/elasticsearch/configEnvironment=PID_DIR=/var/run/elasticsearchEnvironmentFile=-/etc/sysconfig/elasticsearchWorkingDirectory=/home/elk/elasticsearchUser=elkGroup=elkExecStart=/home/elk/elasticsearch/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quietStandardOutput=journalStandardError=inheritLimitNOFILE=65536LimitNPROC=4096LimitAS=infinityLimitFSIZE=infinityTimeoutStopSec=0KillSignal=SIGTERMKillMode=processSendSIGKILL=noSuccessExitStatus=143[Install]WantedBy=multi-user.target[root@localhost system]#
Node1节点
/home/elk/elasticsearch2/config[root@localhost config]# grep -v "^#" elasticsearch.yml cluster.name: my-applicationnode.name: node1node.master: falsenode.attr.rack: r1node.max_local_storage_nodes: 3path.data: /home/eladatapath.logs: /var/log/elkhttp.cors.enabled: truehttp.cors.allow-origin: "*"network.host: 192.168.1.70transport.tcp.port: 9303http.port: 9302discovery.zen.ping.unicast.hosts: ["192.168.1.70:9301"][root@localhost config]#
手动启动命令
su elk -l -c '/home/elk/elasticsearch2/bin/elasticsearch2 -d'
启动文件 elasticsearch2.service
[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat elasticsearch2.service [Unit]Description=ElasticsearchDocumentation=http://www.elastic.coWants=network-online.targetAfter=network-online.target[Service]RuntimeDirectory=elasticsearch2PrivateTmp=trueEnvironment=ES_HOME=/home/elk/elasticsearch2Environment=ES_PATH_CONF=/home/elk/elasticsearch2/configEnvironment=PID_DIR=/var/run/elasticsearchEnvironmentFile=-/etc/sysconfig/elasticsearchWorkingDirectory=/home/elk/elasticsearchUser=elkGroup=elkExecStart=/home/elk/elasticsearch2/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quietStandardOutput=journalStandardError=inheritLimitNOFILE=65536LimitNPROC=4096LimitAS=infinityLimitFSIZE=infinityTimeoutStopSec=0KillSignal=SIGTERMKillMode=processSendSIGKILL=noSuccessExitStatus=143[Install]WantedBy=multi-user.target[root@localhost system]#
Node2节点
/home/elk/elasticsearch3/config[root@localhost config]# grep -v "^#" elasticsearch.yml cluster.name: my-applicationnode.name: node2node.attr.rack: r1node.master: falsenode.max_local_storage_nodes: 3path.data: /home/eladatapath.logs: /var/log/elkhttp.cors.enabled: truehttp.cors.allow-origin: "*"network.host: 192.168.1.70http.port: 9203transport.tcp.port: 9304discovery.zen.ping.unicast.hosts: ["192.168.1.70:9301"]discovery.zen.minimum_master_nodes: 1[root@localhost config]#
手动启动命令
su elk -l -c '/home/elk/elasticsearch3/bin/elasticsearch3 -d'
启动文件 elasticsearch3.service
[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat elasticsearch3.service [Unit]Description=ElasticsearchDocumentation=http://www.elastic.coWants=network-online.targetAfter=network-online.target[Service]RuntimeDirectory=elasticsearch3PrivateTmp=trueEnvironment=ES_HOME=/home/elk/elasticsearch3Environment=ES_PATH_CONF=/home/elk/elasticsearch3/configEnvironment=PID_DIR=/var/run/elasticsearchEnvironmentFile=-/etc/sysconfig/elasticsearchWorkingDirectory=/home/elk/elasticsearch3User=elkGroup=elkExecStart=/home/elk/elasticsearch3/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quietStandardOutput=journalStandardError=inheritLimitNOFILE=65536LimitNPROC=4096LimitAS=infinityLimitFSIZE=infinityTimeoutStopSec=0KillSignal=SIGTERMKillMode=processSendSIGKILL=noSuccessExitStatus=143[Install]WantedBy=multi-user.target[root@localhost system]#
下载logstash
目录如下,默认配置即可
[root@localhost logstash]# pwd/home/elk/logstash[root@localhost logstash]#
手动启动命令
./logstash -f ../dev.conf nohup ./logstash -f ../dev.conf &
下载kibana
配置文件如下
[root@localhost config]# pwd/home/elk/kibana/config[root@localhost config]# grep -v "^#" kibana.yml server.host: "192.168.1.70"elasticsearch.hosts: ["http://192.168.1.70:9200"]kibana.index: ".kibana"i18n.locale: "zh-CN"
手动启动命令
./kibananohup ./kibana &
kibana启动文件
[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat kibana.service [Unit]Description=Kibana Server Manager[Service]ExecStart=/home/elk/kibana/bin/kibana[Install]WantedBy=multi-user.target[root@localhost system]#
端口为:5601 访问:192.168.1.70:5601
安装Elasticsearch -head
yum install git npmgit clone https://github.com/mobz/elasticsearch-head.git [root@localhost elasticsearch-head]# pwd/home/elk/elasticsearch-head[root@localhost elasticsearch-head]#
启动
npm install npm run startnohup npm run start &
curl -XPUT '192.168.2.67:9100/book'
访问192.168.2.67:9100 即可访问
下载kafka
修改配置文件如下
[root@localhost config]# pwd/home/elk/kafka/config[root@localhost config]# grep -v "^#" server.properties broker.id=0listeners=PLAINTEXT://192.168.1.70:9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/var/log/kafka-logsnum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=localhost:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0delete.topic.enable=true[root@localhost config]#
kafka配置启动zookeeper
手动启动方式
[root@localhost bin]# pwd/home/elk/kafka/bin[root@localhost bin]#./zookeeper-server-start.sh ../config/zookeeper.properties
systemctl 启动zookeeper
[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat zookeeper.service [Service]Type=forkingSyslogIdentifier=zookeeperRestart=alwaysRestartSec=0sExecStart=/home/elk/kafka/bin/zookeeper-server-start.sh -daemon /home/elk/kafka/config/zookeeper.propertiesExecStop=/home/elk/kafka/bin/zookeeper-server-stop.sh[root@localhost system]#
启动kafka服务
手动启动方式
./kafka-server-start.sh ../config/server.properties
systemctl 启动kafka
[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat kafka.service [Unit]Description=Apache kafkaAfter=network.target[Service]Type=simpleRestart=alwaysRestartSec=0sExecStart=/home/elk/kafka/bin/kafka-server-start.sh /home/elk/kafka/config/server.propertiesExecStop=/home/elk/kafka/bin/kafka-server-stop.sh[root@localhost system]#
测试kafka
新建一个名字为test的topic
/kafka-topics.sh --create --zookeeper 192.168.1.70:2181 --replication-factor 1 --partitions 1 --topic test
查看kafka中的topic
./kafka-topics.sh --list --zookeeper 192.168.1.70:2181
往kafka topic为test中 生产消息
./kafka-console-producer.sh --broker-list 192.168.1.70:9092 --topic test
在kafka topic为test中 消费消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.70:9092 --topic test --from-beginning
生产的消息,消费那边接受到即是ok的
目标机器安装filebeat
安装6.5版本的
[root@localhost filebeat]# pwd/usr/local/filebeat[root@localhost filebeat]# cat filebeat.yml filebeat.prospectors:- type: log paths: - /opt/logs/workphone-tcp/catalina.out fields: tag: 54_tcp_catalina_out- type: log paths: - /opt/logs/workphone-webservice/catalina.out fields: tag: 54_web_catalina_outname: 192.168.1.54filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: falsesetup.template.settings: index.number_of_shards: 3output.kafka: hosts: ["192.168.1.70:9092"] topic: "filebeat-log" partition.hash: reachable_only: true compression: gzip max_message_bytes: 1000000 required_acks: 1[root@localhost filebeat]#
安装完成后去logstash编辑配置文件
logstash操作
[root@localhost logstash]# pwd/home/elk/logstash[root@localhost logstash]# cat dev.conf input { kafka{ bootstrap_servers => "192.168.1.70:9092" topics => ["filebeat-log"] codec => "json" }}filter { if [fields][tag]=="jpwebmap" { json{ source => "message" remove_field => "message" } geoip { source => "client" target => "geoip" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float"] } } if [fields][tag] == "54_tcp_catalina_out"{ grok { match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] } date { match => ["logdate", "ISO8601"] } mutate { remove_field => [ "logdate" ] } } if [fields][tag] == "54_web_catalina_out"{ grok { match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] } date { match => ["logdate", "ISO8601"] } mutate { remove_field => [ "logdate" ] } } if [fields][tag] == "55_tcp_catalina_out"{ grok { match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] } date { match => ["logdate", "ISO8601"] } mutate { remove_field => [ "logdate" ] } } if [fields][tag] == "55_web_catalina_out"{ grok { match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] } date { match => ["logdate", "ISO8601"] } mutate { remove_field => [ "logdate" ] } } if [fields][tag] == "51_nginx80_access_log" { mutate { add_field => { "spstr" => "%{[log][file][path]}" } } mutate { split => ["spstr" , "/"] # save the last element of the array as the api_method. add_field => ["src", "%{[spstr][-1]}" ] } mutate{ remove_field => [ "friends", "ecs", "agent" , "spstr" ] } grok { match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"%{DATA:x_forwarded_for}\" \"%{NUMBER:request_time}\" \"%{DATA:upstream_addr}\" \"%{DATA:upstream_status}\"" } remove_field => "message" } date { match => ["time", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } geoip { source => "x_forwarded_for" target => "geoip" database => "/home/elk/logstash/GeoLite2-City.mmdb" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float"] } }}output {if [fields][tag] == "wori"{ elasticsearch { hosts => ["192.168.1.70:9200"] index => "zabbix" } }if [fields][tag] == "54_tcp_catalina_out"{ elasticsearch { hosts => ["192.168.1.70:9200"] index => "54_tcp_catalina_out" } }if [fields][tag] == "54_web_catalina_out"{ elasticsearch { hosts => ["192.168.1.70:9200"] index => "54_web_catalina_out" } }if [fields][tag] == "55_tcp_catalina_out"{ elasticsearch { hosts => ["192.168.1.70:9200"] index => "55_tcp_catalina_out" } } if [fields][tag] == "55_web_catalina_out"{ elasticsearch { hosts => ["192.168.1.70:9200"] index => "55_web_catalina_out" } }if [fields][tag] == "51_nginx80_access_log" { stdout{} elasticsearch { hosts => ["192.168.1.70:9200"] index => "51_nginx80_access_log" } }}
其他的配置文件
index.conf
filter { mutate { add_field => { "spstr" => "%{[log][file][path]}" } } mutate { split => ["spstr" , "/"] # save the last element of the array as the api_method. add_field => ["src", "%{[spstr][-1]}" ] } mutate{ remove_field => [ "friends", "ecs", "agent" , "spstr" ] }}
java.conf
filter {if [fields][tag] == "java"{ grok { match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] } date { match => ["logdate", "ISO8601"] } mutate { remove_field => [ "logdate" ] } } #End if}
kafkainput.conf
input { kafka{ bootstrap_servers => "172.16.11.68:9092" #topics => ["ql-prod-tomcat" ] topics => ["ql-prod-dubbo","ql-prod-nginx","ql-prod-tomcat" ] codec => "json" consumer_threads => 5 decorate_events => true #auto_offset_reset => "latest" group_id => "logstash" #client_id => "" ############################# HELK Optimizing Latency ############################# fetch_min_bytes => "1" request_timeout_ms => "305000" ############################# HELK Optimizing Availability ############################# session_timeout_ms => "10000" max_poll_records => "550" max_poll_interval_ms => "300000" }}#input {# kafka{# bootstrap_servers => "172.16.11.68:9092"# topics => ["ql-prod-java-dubbo","ql-prod","ql-prod-java" ]# codec => "json"# consumer_threads => 15# decorate_events => true# auto_offset_reset => "latest"# group_id => "logstash-1"# ############################# HELK Optimizing Latency ############################## fetch_min_bytes => "1"# request_timeout_ms => "305000"# ############################# HELK Optimizing Availability ############################## session_timeout_ms => "10000"# max_poll_records => "550"# max_poll_interval_ms => "300000"# }#}
nginx.conf
filter {if [fields][tag] == "nginx-access" { mutate { add_field => { "spstr" => "%{[log][file][path]}" } } mutate { split => ["spstr" , "/"] # save the last element of the array as the api_method. add_field => ["src", "%{[spstr][-1]}" ] } mutate{ remove_field => [ "friends", "ecs", "agent" , "spstr" ] } grok { match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"%{DATA:x_forwarded_for}\" \"%{NUMBER:request_time}\" \"%{DATA:upstream_addr}\" \"%{DATA:upstream_status}\"" } remove_field => "message" } date { match => ["time", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } geoip { source => "x_forwarded_for" target => "geoip" database => "/opt/logstash-6.2.4/GeoLite2-City.mmdb" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float"] } } #endif}
ouput.conf
output{ if [fields][tag] == "nginx-access" { stdout{} elasticsearch { user => elastic password => WR141bp2sveJuGFaD4oR hosts => ["172.16.11.67:9200"] index => "logstash-%{[fields][proname]}-%{+YYYY.MM.dd}" } } #stdout{} if [fields][tag] == "java" { elasticsearch { user => elastic password => WR141bp2sveJuGFaD4oR hosts => ["172.16.11.66:9200","172.16.11.68:9200"] index => "%{[host][name]}-%{[src]}" } }}
关于"Centos7.6如何部署ELK日志分析系统"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
文件
手动
配置
命令
消息
篇文章
节点
日志
系统
分析
方式
更多
消费
生产
不错
实用
内容
名字
文章
机器
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全经费管理办法
青岛点睛网络技术有限公司
php数据库提取数据
学习软件开发工资待遇
谷歌网络安全项目
多线程创建的数据库连接如何释放
三级网络技术用什么教材
个人计划软件开发
删除数据库备份
公安执法监督管理服务器
cc服务器是什么
多媒体方面数据库的用处
数据库插件怎么显示宝箱
宣城市网络安全公开课
计算机网络技术算高级么
软件开发兼职 一天多少钱
河北北斗授时服务器虚拟主机
计算机网络技术介绍一本书作文
软件开发售后协议书
mongo数据库安全漏洞
软件开发费用进展
用童谣写网络安全的思维导图
腾讯云数据库的好处
河南省一坦网络技术
电网网络安全系统
长沙服务器内存条回收
广达X99 双路刀片服务器主板
软件开发税率13个点
APACHE 服务器
上海公安局数据库被攻陷