千家信息网

docker-compose怎么搭建 es/kibana/logstash elk

发表于:2024-10-14 作者:千家信息网编辑
千家信息网最后更新 2024年10月14日,本篇内容介绍了"docker-compose怎么搭建 es/kibana/logstash elk"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理
千家信息网最后更新 2024年10月14日docker-compose怎么搭建 es/kibana/logstash elk

本篇内容介绍了"docker-compose怎么搭建 es/kibana/logstash elk"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

一、准备两台centos7 虚拟机

二、新建 /home/docker-contains/es 再创建node文件夹

分别在node下新建 /data /logs /conf文件

在conf目录下 新建elasticsearch.yml文件如下

cluster.name: elasticsearch-clusternode.name: es01network.bind_host: 0.0.0.0network.publish_host: 192.168.65.135http.port: 9200transport.tcp.port: 9300http.cors.enabled: truehttp.cors.allow-origin: "*"node.master: truenode.data: truediscovery.zen.ping.unicast.hosts: ["192.168.65.135:9300","192.168.65.136:9300"]discovery.zen.minimum_master_nodes: 1path.logs: /usr/share/elasticsearch/logsxpack.security.audit.enabled: true~

回到es目录:

新建docker-compose.yml如下

version: '3'services:     es01:       image:  elasticsearch:6.6.1       container_name: es01       restart: always       volumes:         - /home/docker_container/es/master/data:/usr/share/elasticsearch/data:rw         - /home/docker_container/es/master/conf/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml         - /home/docker_container/es/master/logs:/user/share/elasticsearch/logs:rw         - /home/docker_container/es/master/plugin1:/usr/share/elasticsearch/plugins:rw       ports:         - "9200:9200"         - "9300:9300"

另外一台机器重复上述过程:

新建elasticsearch.yml:

cluster.name: elasticsearch-clusternode.name: es02network.bind_host: 0.0.0.0network.publish_host: 192.168.65.136http.port: 9200transport.tcp.port: 9300http.cors.enabled: truehttp.cors.allow-origin: "*"node.master: truenode.data: truediscovery.zen.ping.unicast.hosts: ["192.168.65.135:9300","192.168.65.136:9300"]discovery.zen.minimum_master_nodes: 1path.logs: /usr/share/elasticsearch/logsxpack.security.audit.enabled: true

新建docker-compose.yml:

version: '3'services:     es02:       image:  elasticsearch:6.6.1       container_name: es02       restart: always       volumes:         - /home/docker-container/es/node1/data:/usr/share/elasticsearch/data:rw         - /home/docker-container/es/node1/conf/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml         - /home/docker-container/es/node1/logs:/user/share/elasticsearch/logs:rw         - /home/docker-container/es/node1/plugin1:/usr/share/elasticsearch/plugins:rw       ports:         - "9200:9200"         - "9300:9300"

分别在docker-compose.yml同级目录执行

docker-compose up -d

安装kibana:

docker-compose.yml

version: '3'services:  kibana:    image: kibana:6.6.1    container_name: kibana    volumes:        - /home/docker_container/kibana/conf/kibana.yml:/usr/share/kibana/config/kibana.yml    restart: always    ports:        - 5601:5601

kibana.yml:

elasticsearch.hosts: ["http://192.168.65.135:9200"]server.host: "0.0.0.0"xpack.monitoring.ui.container.elasticsearch.enabled: truei18n.locale: zh-CN~

如果出现 Kibana server is not ready yet

第一点:KB、ES版本不一致(网上大部分都是这么说的)

解决方法:把KB和ES版本调整为统一版本

第二点:kibana.yml中配置有问题(通过查看日志,发现了Error: No Living connections的问题)

解决方法:将配置文件kibana.yml中的elasticsearch.url改为正确的链接,默认为: http://elasticsearch:9200

改为http://自己的IP地址:9200

第三点:浏览器没有缓过来

解决方法:刷新几次浏览器(狂刷 我刷了 6遍才出来)。

logstash:

logstash采用非docker安装,上传logstash-6.4.3.tar.gz至/home/software目录

解压缩gz包,进入到解压后的logstash-6.4.3目录

编辑:vim config/pipelines.yml (如果需要连接多个 只需在后面追加pipeline.id和path.config,切记pipeline.id不能重复)

#   Default is path.data/dead_letter_queue##   path.dead_letter_queue:- pipeline.id: table1  path.config: "/home/software/logstash-6.4.3/conf/mysql_1.conf"- pipeline.id: table2  path.config: "/home/software/logstash-6.4.3/conf/mysql.conf"

mysql.conf:

input {  jdbc {    jdbc_driver_library => "/home/software/logstash-6.4.3/sql/mysql-connector-java-5.1.46.jar"    jdbc_driver_class => "com.mysql.jdbc.Driver"    jdbc_connection_string => "jdbc:mysql://ip:3306/test"    jdbc_user => "root"    jdbc_password => ""    schedule => "* * * * *"    statement => "SELECT * FROM user WHERE update_time >= :sql_last_value"    use_column_value => true    tracking_column_type => "timestamp"    tracking_column => "update_time"    last_run_metadata_path => "syncpoint_table"  }}output {    elasticsearch {        # ES的IP地址及端口        hosts => ["192.168.65.135:9200","192.168.65.136:9200"]        # 索引名称 可自定义        index => "user"        # 需要关联的数据库中有有一个id字段,对应类型中的id        document_id => "%{id}"        document_type => "user"    }    stdout {        # JSON格式输出        codec => json_lines    }}

mysql_1.conf:

input {  jdbc {    jdbc_driver_library => "/home/software/logstash-6.4.3/sql/mysql-connector-java-5.1.46.jar"    jdbc_driver_class => "com.mysql.jdbc.Driver"    jdbc_connection_string => "jdbc:mysql://ip:3306/lvz_goods?autoReconnect=true&useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8"    jdbc_user => "root"    jdbc_password => ""    schedule => "* * * * *"    statement => "SELECT * FROM lvz_product WHERE update_time >= :sql_last_value"    use_column_value => true    tracking_column_type => "timestamp"    tracking_column => "update_time"    last_run_metadata_path => "syncpoint_table"  }}output {    elasticsearch {        # ES的IP地址及端口        hosts => ["192.168.65.135:9200","192.168.65.136:9200"]        # 索引名称 可自定义        index => "goods"        # 需要关联的数据库中有有一个id字段,对应类型中的id        document_id => "%{id}"        document_type => "goods"    }    stdout {        # JSON格式输出        codec => json_lines    }}

注意:需要把mysql连接包放到对应目录;至此三大件安装就完成了

es整合ik和拼音分词器 1下载elasticsearch-analysis-ik-6.6.1.zip

分别上传到两台机器上的node1和 node2目录 ,解压重命名为ik

2下载elasticsearch-analysis-pinyin-6.6.1.zip
分别上传到两台机器上的node1和 node2目录 ,解压重命名为pinyin

编辑docker-compose文件 挂载:

 /home/docker-container/es/node1/plugin1:/usr/share/elasticsearch/plugins:rw

重启es,到kibana界面 :

执行

先删除goods索引

在goods索引自定义ik和拼音分词器,ik_smart_pinyin:

DELETE  /goodsPUT /goods{   "settings": {        "analysis": {            "analyzer": {                "ik_smart_pinyin": {                    "type": "custom",                    "tokenizer": "ik_smart",                    "filter": ["my_pinyin", "word_delimiter"]                },                "ik_max_word_pinyin": {                    "type": "custom",                    "tokenizer": "ik_max_word",                    "filter": ["my_pinyin", "word_delimiter"]                }            },            "filter": {                "my_pinyin": {                    "type" : "pinyin",                    "keep_separate_first_letter" : true,                    "keep_full_pinyin" : true,                    "keep_original" : true,                    "limit_first_letter_length" : 16,                    "lowercase" : true,                    "remove_duplicated_term" : true                 }            }        }  }  }

指定goods索引为ik_smart_pinyin

POST /goods/_mapping/goods{           "goods": {        "properties": {          "@timestamp": {            "type": "date"          },          "@version": {            "type": "text",            "fields": {              "keyword": {                "type": "keyword",                "ignore_above": 256              }            }          },          "attribute_list": {            "type": "text",            "fields": {              "keyword": {                "type": "keyword",                "ignore_above": 256              }            }          },          "category_id": {            "type": "long"          },          "created_time": {            "type": "date"          },          "detail": {            "type": "text",             "analyzer":"ik_smart_pinyin",            "search_analyzer":"ik_smart_pinyin"          },          "id": {            "type": "long"          },          "main_image": {            "type": "text",            "fields": {              "keyword": {                "type": "keyword",                "ignore_above": 256              }            }          },          "name": {            "type": "text",            "analyzer":"ik_smart_pinyin",            "search_analyzer":"ik_smart_pinyin"          },          "revision": {            "type": "long"          },          "status": {            "type": "long"          },          "sub_images": {            "type": "text",            "fields": {              "keyword": {                "type": "keyword",                "ignore_above": 256              }            }          },          "subtitle": {            "type": "text",          "analyzer":"ik_smart",         "search_analyzer":"ik_smart"          },          "updated_time": {            "type": "date"          }        }      }}

安装kafka:

version: '2'services:  zookeeper:    image: wurstmeister/zookeeper    ports:      - "2181:2181"  kafka:    image: wurstmeister/kafka    ports:      - "9092"             # kafka 把9092端口随机映射到主机的端口    environment:      KAFKA_ADVERTISED_HOST_NAME: 192.168.65.135           #本机ip      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181      KAFKA_CREATE_TOPICS: test:1:1    volumes:      - /var/run/docker.sock:/var/run/docker.sock

启动两个节点 kafka:

docker-compose up -d --scale kafka=2        本机启动一个有两个节点的 Kafka 集群

本地项目集成,kafka和elk;

application.yml

###服务启动端口号server:  port: 8500###服务名称(服务注册到eureka名称)eureka:  client:    service-url:      defaultZone: http://localhost:8100/eurekaspring:  application:    name:  app-lvz-goods  redis:    host: 192.168.65.136    port: 6379    password: feilvzhang    pool:      max-idle: 100      min-idle: 1      max-active: 1000      max-wait: -1  ###数据库相关连接  datasource:    username: root    password:     driver-class-name: com.mysql.jdbc.Driver    url: jdbc:mysql://192.168.125.113:3306/lvz_goods?autoReconnect=true&useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8  data:    elasticsearch:      ####集群名称      cluster-name: elasticsearch-cluster      ####地址      cluster-nodes: 192.168.65.135:9300,192.168.65.136:9300  kafka:    bootstrap-servers: 192.168.65.135:32768,192.168.65.135:32769

日志切面:

package com.lvz.shop.elk.aop;import com.alibaba.fastjson.JSONObject;import com.lvz.shop.elk.kafka.KafkaSender;import org.aspectj.lang.JoinPoint;import org.aspectj.lang.annotation.AfterReturning;import org.aspectj.lang.annotation.Aspect;import org.aspectj.lang.annotation.Before;import org.aspectj.lang.annotation.Pointcut;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.web.context.request.RequestContextHolder;import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest;import java.text.SimpleDateFormat;import java.util.Arrays;import java.util.Date;/** * @description: ELK拦截日志信息 * @author: flz * @date: 2019/8/9 15:57 */@Aspect@Componentpublic class AopLogAspect {    @Autowired    private KafkaSender kafkaSender;    // 申明一个切点 里面是 execution表达式    @Pointcut("execution(* com.lvz.shop.*.impl.*.*(..))")    private void serviceAspect() {    }    // 请求method前打印内容    @Before(value = "serviceAspect()")    public void methodBefore(JoinPoint joinPoint) {        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder                .getRequestAttributes();        HttpServletRequest request = requestAttributes.getRequest();        // // 打印请求内容        // log.info("===============请求内容===============");        // log.info("请求地址:" + request.getRequestURL().toString());        // log.info("请求方式:" + request.getMethod());        // log.info("请求类方法:" + joinPoint.getSignature());        // log.info("请求类方法参数:" + Arrays.toString(joinPoint.getArgs()));        // log.info("===============请求内容===============");        JSONObject jsonObject = new JSONObject();        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式        //请求时间        jsonObject.put("request_time", df.format(new Date()));        //请求URL        jsonObject.put("request_url", request.getRequestURL().toString());        //请求方法        jsonObject.put("request_method", request.getMethod());        //请求类方法        jsonObject.put("signature", joinPoint.getSignature());        //请求参数        jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs()));        JSONObject requestJsonObject = new JSONObject();        requestJsonObject.put("request", jsonObject);        kafkaSender.send(requestJsonObject);    }    // 在方法执行完结后打印返回内容    @AfterReturning(returning = "o", pointcut = "serviceAspect()")    public void methodAfterReturing(Object o) {        // log.info("--------------返回内容----------------");        // log.info("Response内容:" + gson.toJson(o));        // log.info("--------------返回内容----------------");        JSONObject respJSONObject = new JSONObject();        JSONObject jsonObject = new JSONObject();        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式        jsonObject.put("response_time", df.format(new Date()));        jsonObject.put("response_content", JSONObject.toJSONString(o));        respJSONObject.put("response", jsonObject);        kafkaSender.send(respJSONObject);    }}

kafka消息发送:

package com.lvz.shop.elk.kafka;import com.alibaba.fastjson.JSON;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Component;import org.springframework.util.concurrent.ListenableFuture;import org.springframework.util.concurrent.ListenableFutureCallback;/** * @description: 生产者 * @author: flz * @date: 2019/8/9 15:59 */@Component@Slf4jpublic class KafkaSender {    @Autowired    private KafkaTemplate kafkaTemplate;    /**     * kafka 发送消息     *     * @param obj 消息对象     */    public void send(T obj) {        String jsonObj = JSON.toJSONString(obj);        log.info("------------ message = {}", jsonObj);        // 发送消息        ListenableFuture> future = kafkaTemplate.send("goods_mylog", jsonObj);        future.addCallback(new ListenableFutureCallback>() {            @Override            public void onFailure(Throwable throwable) {                log.info("Produce: The message failed to be sent:" + throwable.getMessage());            }            @Override            public void onSuccess(SendResult stringObjectSendResult) {                // TODO 业务处理                log.info("Produce: The message was sent successfully:");                log.info("Produce: _+_+_+_+_+_+_+ result: " + stringObjectSendResult.toString());            }        });    }}

异常切面日志:

package com.lvz.shop.elk.aop.error;import com.alibaba.fastjson.JSONObject;import com.lvz.shop.elk.kafka.KafkaSender;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.ControllerAdvice;import org.springframework.web.bind.annotation.ExceptionHandler;import org.springframework.web.bind.annotation.ResponseBody;import java.text.SimpleDateFormat;import java.util.Date;/** * @description: 全局异常处理 * @author: flz * @date: 2019/8/9 15:56 *///@ControllerAdvice@Slf4jpublic class GlobalExceptionHandler {    @Autowired    private KafkaSender kafkaSender;    @ExceptionHandler(RuntimeException.class)    @ResponseBody    public JSONObject exceptionHandler(Exception e) {        log.info("###全局捕获异常###,error:{}", e);        // 1.封装异常日志信息        JSONObject errorJson = new JSONObject();        JSONObject logJson = new JSONObject();        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式        logJson.put("request_time", df.format(new Date()));        logJson.put("error_info", e);        errorJson.put("request_error", logJson);        kafkaSender.send(errorJson);        // 2. 返回错误信息        JSONObject result = new JSONObject();        result.put("code", 500);        result.put("msg", "系统错误");        return result;    }}

logstash 配置kafka和es:

goods_mylog.conf:

input {  kafka {    bootstrap_servers => ["192.168.65.135:32768,192.168.65.135:32769"]    topics => ["goods_mylog"]  }}output {    stdout { codec => rubydebug }    elasticsearch {       hosts => ["192.168.65.135:9200","192.168.65.136:9200"]       index => "goods_mylog"    }}

mylog.conf:

input {  kafka {    bootstrap_servers => ["192.168.65.135:32768,192.168.65.135:32769"]    topics => ["my_log"]  }}output {    stdout { codec => rubydebug }    elasticsearch {       hosts => ["192.168.65.135:9200","192.168.65.136:9200"]       index => "my_log"    }}

启动项目 访问 接口elk就会开始收集日志,在kibana下查看:

"docker-compose怎么搭建 es/kibana/logstash elk"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0