TypechoJoeTheme

Toasobi的博客

ELFK企业级整合使用

本文最后更新于2023年09月07日,已超过377天没有更新。如果文章内容或图片资源失效,请留言反馈,我会及时处理,谢谢!

日志监控模块

日志监控模块教学案例,技术栈为 springboot,elasticSearch,kibana,logstash,filebeat

项目结构

summit-cloud-log
    │  .gitkeep
    │  pom.xml
    │  README.md
    │  summit-cloud-log.iml
    │
    ├─logs      #项目生成日志文件存放处
    │  └─summit-cloud-log
    │          error.log
    │          info.log
    │          warn.log
    │
    ├─src
    │  ├─main
    │  │  ├─java
    │  │  │  └─com
    │  │  │      └─summit
    │  │  │          │  SummitCloudLogApplication.java   #启动类
    │  │  │          │
    │  │  │          ├─controller
    │  │  │          │      logTestController.java  #测试接口
    │  │  │          │
    │  │  │          └─entity
    │  │  │                  Person.java  #测试用实体类
    │  │  │
    │  │  └─resources
    │  │          application.properties
    │  │          application.yaml
    │  │          logback-spring.xml  #logback配置文件
    │  │
    │  └─test
    │      └─java
    │          └─com
    │              └─summit
    │                      SummitCloudLogApplicationTests.java

配置讲解

pom.xml
<!--logstash-logback-encoder-->
        <dependency>
            <groupId>net.logstash.logback</groupId>
            <artifactId>logstash-logback-encoder</artifactId>
            <version>7.0.1</version>
        </dependency>
logstash.conf
input {
  beats {
    port => 5044
    host => "0.0.0.0"
  }
}

output {
    elasticsearch {
        hosts => ["43.138.199.12:9200"]
        action=>"index"
        index => "%{[fields][servicename]}-%{+yyyy.MM.dd}"
    }
    stdout{
       codec =>rubydebug
    }
}
filebeat.yaml

案例的filebeat是在localhost上面跑的,扫描的文件夹即是本地文件夹 <br/>
在后面summit-cloud项目整合模块会有在服务器部署filebeat指导

# ============================== Filebeat inputs ===============================

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - C:\Users\123\Desktop\summit-cloud-log\logs\summit-cloud-log\*.log
  fields:
    servicename: summit-cloud-log
  multiline:
    pattern: '^\{'
    negate: true
    match: after
    timeout: 5s

# ============================== Filebeat modules ==============================

filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml

  # Set to true to enable config reloading
  reload.enabled: false

  # Period on which files under path should be checked for changes
  #reload.period: 10s

# ======================= Elasticsearch template setting =======================

setup.template.settings:
  index.number_of_shards: 1
  #index.codec: best_compression
  #_source.enabled: false
  
# ================================== Outputs ===================================

# ------------------------------ Logstash Output -------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["43.138.199.12:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"
docker-compose 文件
version: "2.2"
services:
  elasticsearch:
    image: elasticsearch:7.4.2
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - ES_JAVA_OPTS=-Xms512m -Xmx1g # 设置最大内存为1g
    ports:
      - 9200:9200
      - 9300:9300
    networks:
      - elk

  kibana:
    image: kibana:7.4.2
    container_name: kibana
    depends_on:
      - elasticsearch
    environment:
      - ELASTICSEARCH_HOSTS=http://43.138.199.12:9200
    ports:
      - 5601:5601
    networks:
      - elk

  logstash:
    image: logstash:7.14.2
    container_name: logstash
    environment:
      - node.name=es01
      - LS_JAVA_OPTS=-Xms512m -Xmx1g # 设置最小内存为512MB,最大内存为1g
    volumes:
      - /home/logstash/config/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
    ports:
      - 5044:5044
    networks:
      - elk

networks:
  elk:
    driver: bridge
application.properties
#指定项目开发环境
spring.profiles.active=dev
#指定logback-spring.xml位置
logging.config=classpath:logback-spring.xml
application.yaml
spring:
  application:
    name: summit-cloud-log #指定项目名称,后面生成文件等log配置需要用到
logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>

<configuration>
    <!-- 日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL -->

    <!-- 变量配置 -->
    <!--
        格式化输出:
        %d表示日期,
        %thread表示线程名,
        %-5level:级别从左显示5个字符宽度
        %msg:日志消息,%n是换行符
        %logger{36} 表示 Logger 名字最长36个字符
    -->
    <springProperty scope="context" name="spring.application.name" source="spring.application.name"/>
    <property name="LOG_PATTERN"
              value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/>
    <!-- 定义日志存储的路径,不要配置相对路径 -->
    <property name="FILE_PATH" value="./logs/${spring.application.name}"/>
    <!-- 服务名 -->
    <property name="FILE_NAME" value="${spring.application.name}"/>


    <!-- 默认的控制台日志输出 -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <!--展示格式 layout -->
        <layout class="ch.qos.logback.classic.PatternLayout">
            <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %msg:日志消息,%n是换行符 -->
            <pattern>${LOG_PATTERN}</pattern>
        </layout>
    </appender>
    <!--
      这个会打印出所有的info及以下级别的信息,每次大小超过size,
      则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档
    -->
    <appender name="RollingFileInfo" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${FILE_PATH}/info.log</file>
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <charset>UTF-8</charset>
            <pattern>${LOG_PATTERN}</pattern>
        </encoder>
        <!--interval属性用来指定多久滚动一次,默认是1天-->
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <FileNamePattern>${FILE_PATH}/%d{yyyy-MM}/summit-cloud-log-%d{yyyy-MM-dd}.%i-INFO.log.gz</FileNamePattern>
            <MaxHistory>30</MaxHistory><!--保留近30天的日志 -->
            <maxFileSize>20MB</maxFileSize><!--用来指定日志文件的上限大小-->
        </rollingPolicy>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>INFO</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!-- 这个会打印出所有的warn及以上级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档-->
    <appender name="RollingFileWarn" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${FILE_PATH}/warn.log</file>

        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <charset>UTF-8</charset>
            <pattern>${LOG_PATTERN}</pattern>
        </encoder>
        <!--interval属性用来指定多久滚动一次,默认是1天-->
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <FileNamePattern>${FILE_PATH}/%d{yyyy-MM}/summit-cloud-log-%d{yyyy-MM-dd}.%i-WARN.log.gz</FileNamePattern>
            <MaxHistory>30</MaxHistory><!--保留近30天的日志 -->
            <maxFileSize>20MB</maxFileSize><!--用来指定日志文件的上限大小-->
        </rollingPolicy>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>WARN</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!-- 这个会打印出所有的error及以上级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档-->
    <appender name="RollingFileError" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${FILE_PATH}/error.log</file>
        <!--interval属性用来指定多久滚动一次,默认是1天-->
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <charset>UTF-8</charset>
            <pattern>${LOG_PATTERN}</pattern>
        </encoder>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <FileNamePattern>${FILE_PATH}/%d{yyyy-MM}/summit-cloud-log-%d{yyyy-MM-dd}.%i-ERROR.log.gz</FileNamePattern>
            <MaxHistory>30</MaxHistory><!--保留近30天的日志 -->
            <maxFileSize>20MB</maxFileSize><!--用来指定日志文件的上限大小-->
        </rollingPolicy>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>ERROR</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>
    <!--Logger节点用来单独指定日志的形式,比如要为指定包下的class指定不同的日志级别等。-->
    <!--然后定义loggers,只有定义了logger并引入的appender,appender才会生效-->

    <!-- 多环境配置 -->
    <springProfile name="dev">
            <!--过滤掉spring和mybatis的一些无用的DEBUG信息-->
            <logger name="org.mybatis" level="info" additivity="false">
                <appender-ref ref="STDOUT"/>
            </logger>
            <!--监控系统信息-->
            <!--若是additivity设为false,则 子Logger 只会在自己的appender里输出,而不会在 父Logger 的appender里输出。-->
            <Logger name="top.fate" level="info" additivity="false">
                <appender-ref ref="STDOUT"/>
            </Logger>

            <root level="info">
                <appender-ref ref="STDOUT"/>
                <appender-ref ref="RollingFileInfo"/>
                <appender-ref ref="RollingFileWarn"/>
                <appender-ref ref="RollingFileError"/>
            </root>
    </springProfile>

    <springProfile name="test">
        <root level="info">
            <appender-ref ref="RollingFileInfo"/>
            <appender-ref ref="RollingFileWarn"/>
            <appender-ref ref="RollingFileError"/>
        </root>
    </springProfile>

</configuration>

logback 配置注意事项

1.logback 在启动时:

  • 在 classpath 中寻找 logback-test.xml 文件
  • 如果找不到 logback-test.xml,则在 classpath 中寻找 logback.groovy 文件
  • 如果找不到 logback.groovy,则在 classpath 中寻找 logback.xml 文件
  • 如果上述的文件都找不到,则 logback 会使用 JDK 的 SPI 机制查找 META-INF/services/ch.qos.logback.classic.spi.Configurator 中的 logback 配置实现类,这个实现类必须实现 Configuration 接口,使> 用它的实现来进行配置
  • 如果上述操作都不成功,logback 就会使用它自带的 BasicConfigurator 来配置,并将日志输出到 console




2.logback 打印级别

  • TRACE<DEBUG<INFO<WARN<ERROR,默认 DEBUG




3.logback root,logger 配置

  • root 和 logger 有些类似与路由匹配,root 作为最后一层但是一定会执行匹配匹配项,logger 可以看作一个个路由
  • logger 的 name 属性需要指定包名或者类名,表示匹配一个包/类下的所有日志信息
  • additivity 参数通常用于避免日志事件在多个 logger 中重复输出的情况:

    •   当 additivity 属性设置为 true 时,日志事件不仅会被当前 logger 处理,也会被传递给父节点 logger 处理
    •   当 additivity 属性设置为 false 时,日志> 事件只会被当前 logger 处理,不会被传递给父节点 logger 处理

项目测试

使用以下接口进行 log 打印输出日志,检测本地日志是否输出正确
同时观察服务器上的 logstash 输出是否正确,kibana 上是否可以检索到对应数据
package com.summit.controller;


import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.summit.common.logInfo;
import com.summit.entity.Person;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;

@RestController
@RequestMapping("/t1")
@Slf4j
public class logTestController {

    @Value("${spring.application.name}")
    private String serviceName; // 从application.properties中获取服务名称

    @GetMapping("/logging")
    public Object logInfo() {
        Person person = new Person();
        person.setName("omerta");
        person.setAge(20);
        person.setAddress("加利福尼亚");
        Gson gson = new Gson();

        log.info("这里是info测试");
        log.info(gson.toJson(person));



        //创建返回给前端的数据,需要返回日志文件记录格式
        // 创建一个ArrayList类型的列表
        ArrayList<logInfo> logInfos = new ArrayList<>();

        // 使用System类获取当前时间戳(以毫秒为单位)
        long timestamp = System.currentTimeMillis();
        // 使用DateUtil类将时间戳格式化为ISO 8601标准的字符串
        SimpleDateFormat pdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
        // 使用DateUtil类将时间戳格式化为ISO 8601标准的字符串,传入SimpleDateFormat对象作为第二个参数
        String timestampStr = DateUtil.format(new Date(timestamp), pdf);

        // 使用DateUtil类获取当前日期
        Date date = DateUtil.date();
        // 使用SimpleDateFormat类将日期格式化为指定格式
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        String dateTime = sdf.format(date);

        // 获取当前线程的对象
        Thread currentThread = Thread.currentThread();
        // 获取当前线程的名称
        String currentThreadName = currentThread.getName();



        logInfo logInfo = new logInfo();
        // 设置到LogInfo对象中
        logInfo.setTimestamp(timestampStr);
        logInfo.setDateTime(dateTime + "[yyyy-MM-dd HH:mm:ss .SSS]");
        logInfo.setLevel("INFO");
        logInfo.setService(serviceName);
        logInfo.setThread(currentThreadName);
        logInfo.setClazz("com.summit.controller.logTestController.logInfo[26]");
        logInfo.setMessage("这里是info测试");
        logInfo.setStackTrace("");


        logInfo logInfo1 = new logInfo();
        // 设置到LogInfo对象中
        logInfo1.setTimestamp(timestampStr);
        logInfo1.setDateTime(dateTime + "[yyyy-MM-dd HH:mm:ss .SSS]");
        logInfo1.setLevel("INFO");
        logInfo1.setService(serviceName);
        logInfo1.setThread(currentThreadName);
        logInfo1.setClazz("com.summit.controller.logTestController.logInfo[26]");
        logInfo1.setMessage(gson.toJson(person));
        logInfo1.setStackTrace("");


        // 将第一个logInfo对象添加到列表中
        logInfos.add(logInfo);
        logInfos.add(logInfo1);

        return JSONObject.toJSONString(logInfos);
    }

    @GetMapping("/logError")
    public Object logError() {
        log.error("这里是error测试");

        logInfo logInfo = new logInfo();
        logInfo logInfo1 = new logInfo();
        // 创建一个ArrayList类型的列表
        ArrayList<logInfo> logInfos = new ArrayList<>();

        try {
            int i = 4 / 0;
        } catch (Exception e) {
            log.error("An error occurred",e);
            //设置报错信息
            // 创建一个StringWriter对象
            StringWriter sw = new StringWriter();
            // 创建一个PrintWriter对象,并传入StringWriter对象作为参数
            PrintWriter pw = new PrintWriter(sw);
            // 调用异常对象的printStackTrace方法,并传入PrintWriter对象作为参数
            e.printStackTrace(pw);
            // 将StringWriter对象转换为字符串
            String stackTrace = sw.toString();
            // 设置到logInfo对象中
            logInfo1.setStackTrace(stackTrace);
        }

        //创建返回给前端的数据,需要返回日志文件记录格式
        // 使用System类获取当前时间戳(以毫秒为单位)
        long timestamp = System.currentTimeMillis();
        // 使用DateUtil类将时间戳格式化为ISO 8601标准的字符串
        SimpleDateFormat pdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
        // 使用DateUtil类将时间戳格式化为ISO 8601标准的字符串,传入SimpleDateFormat对象作为第二个参数
        String timestampStr = DateUtil.format(new Date(timestamp), pdf);

        // 使用DateUtil类获取当前日期
        Date date = DateUtil.date();
        // 使用SimpleDateFormat类将日期格式化为指定格式
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        String dateTime = sdf.format(date);

        // 获取当前线程的对象
        Thread currentThread = Thread.currentThread();
        // 获取当前线程的名称
        String currentThreadName = currentThread.getName();


        // 设置到LogInfo对象中
        logInfo.setTimestamp(timestampStr);
        logInfo.setDateTime(dateTime + "[yyyy-MM-dd HH:mm:ss .SSS]");
        logInfo.setLevel("ERROR");
        logInfo.setService(serviceName);
        logInfo.setThread(currentThreadName);
        logInfo.setClazz("com.summit.controller.logTestController.logError[40]");
        logInfo.setMessage("这里是error测试");
        logInfo.setStackTrace("");

        // 设置到LogInfo对象中
        logInfo1.setTimestamp(timestampStr);
        logInfo1.setDateTime(dateTime + "[yyyy-MM-dd HH:mm:ss .SSS]");
        logInfo1.setLevel("ERROR");
        logInfo1.setService(serviceName);
        logInfo1.setThread(currentThreadName);
        logInfo1.setClazz("com.summit.controller.logTestController.logError[40]");
        logInfo1.setMessage("An error occurred");

        logInfos.add(logInfo);
        logInfos.add(logInfo1);

        return JSONObject.toJSONString(logInfos);
    }

    @GetMapping("/logWarn")
    public Object logWarn(){
        log.warn("这里是warn测试");

        //创建返回给前端的数据,需要返回日志文件记录格式
        // 使用System类获取当前时间戳(以毫秒为单位)
        long timestamp = System.currentTimeMillis();
        // 使用DateUtil类将时间戳格式化为ISO 8601标准的字符串
        SimpleDateFormat pdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
        // 使用DateUtil类将时间戳格式化为ISO 8601标准的字符串,传入SimpleDateFormat对象作为第二个参数
        String timestampStr = DateUtil.format(new Date(timestamp), pdf);

        // 使用DateUtil类获取当前日期
        Date date = DateUtil.date();
        // 使用SimpleDateFormat类将日期格式化为指定格式
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        String dateTime = sdf.format(date);

        // 获取当前线程的对象
        Thread currentThread = Thread.currentThread();
        // 获取当前线程的名称
        String currentThreadName = currentThread.getName();

        logInfo logInfo = new logInfo();
        // 设置到LogInfo对象中
        logInfo.setTimestamp(timestampStr);
        logInfo.setDateTime(dateTime + "[yyyy-MM-dd HH:mm:ss .SSS]");
        logInfo.setLevel("WARN");
        logInfo.setService(serviceName);
        logInfo.setThread(currentThreadName);
        logInfo.setClazz("com.summit.controller.logTestController.logWarn[20]");
        logInfo.setMessage("这里是warn测试");
        logInfo.setStackTrace("");

        return JSONObject.toJSONString(logInfo);
    }
}

cloud项目整合

日志传输流程

*.log --> filebeat --> logstash --> elasticSearch保存 --> kibana显示

搭建流程

1.ELFK各项单独部署 <br/>
2.修改项目logback配置文件 <br/>
3.修改ELFK配置文件,ELFK互联 <br/>
4.filebeat监听项目日志 <br/>
5.进入Kibana进行测试 <br/>
6.启动脚本添加定时策略 <br/>

ELFK部署

  • ELFK采用docker进行部署,项目中的elasticSearch,kibana,logstash均使用脚本部署
  • filebeat采用docker命令部署,具体命令如下,部署完成后使用docker ps查看容器是否运行成功:
## 前两个文件夹挂载是容器配置挂载,不做具体修改
## 第三个文件夹挂载为summit后台项目日志文件夹挂载(注意要使filebeat监听扫描本机上文件,必须挂载到容器内部,然后使用容器内部路径进行输入)
## 第四个文件夹为summit-cloud-log日志案例的日志文件夹目录挂载(命令移植执行可以不需要该行挂载)

docker run -d --name dev_filebeat --restart=always \
-v /home/sunmi/config/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml \
-v /home/sunmi/config/filebeat/log:/var/log/messages \
-v /home/docker_data/summit-cloud/projectLog:/usr/share/filebeat/summit-cloud-log \
-v /home/docker_data/summit-log/projectLog/summit-cloud-log:/home/docker_data/summit-log/projectLog/summit-cloud-log \
docker.elastic.co/beats/filebeat:7.10.1

修改项目logback配置文件

logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
    <springProperty scope="context" name="spring.application.name" source="spring.application.name"/>

    <property name="logging.file.path" value="./logs"/>
    <property name="logging.history.file.path" value="./logs/history" />
    <property name="logback.application.name" value="${spring.application.name}"/>

    <!-- 默认的控制台日志输出 -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <!--展示格式 layout -->
        <layout class="ch.qos.logback.classic.PatternLayout">
            <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %msg:日志消息,%n是换行符 -->
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
            </pattern>
        </layout>
    </appender>

    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file><!-- 指定文件位置及名字 -->
            ${logging.file.path}/${logback.application.name}.log
        </file>
        <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
            <providers>
                <timestamp>
                    <timeZone>GMT+8</timeZone>
                </timestamp>
                <!--指定文件中日志记录格式为json格式,使用json格式传递在后续解析等方面具有优势-->
                <pattern>
                    <pattern>
                        {
                        "dateTime": "%d[yyyy-MM-dd HH:mm:ss .SSS]",
                        "level":"%level",
                        "service":"${logback.application.name}",
                        "thread":"%thread",
                        "class":"%logger.%method[%line]",
                        "message" :"%message",
                        "stackTrace":"%exception"
                        }
                    </pattern>
                </pattern>
            </providers>
        </encoder>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 滚动策略,按照时间滚动 --><!--日志文件输出的文件名:按天回滚 daily -->
            <FileNamePattern>${logging.history.file.path}/${logback.application.name}.%d{yyyy-MM-dd}.log
            </FileNamePattern><!-- 指定回滚后旧文件位置及名字 -->
            <MaxHistory>30</MaxHistory><!--保留近30天的日志 -->
            <totalSizeCap>300MB</totalSizeCap><!--指定了在执行回滚操作时,回滚日志文件的累计大小不应超过300MB,超过则清理旧的回滚日志 -->
        </rollingPolicy>
    </appender>

    <!-- 多环境配置 -->
    <springProfile name="dev">
        <root level="INFO">
            <appender-ref ref="STDOUT"/>
        </root>
    </springProfile>

    <springProfile name="test">
        <root level="INFO">
            <appender-ref ref="FILE"/>
        </root>
    </springProfile>

    <springProfile name="pre">
        <root level="INFO">
            <appender-ref ref="FILE"/>
        </root>
    </springProfile>

    <springProfile name="prod">
        <root level="INFO">
            <appender-ref ref="FILE"/>
        </root>
    </springProfile>
</configuration>

注:所有需要日志监控的微服务模块统一采用该日志文件配置,其中服务名称引用各个模块application配置文件中指定配置,其他基本通用

修改ELFK配置文件,ELFK互联

filebeat.yml (这里使用pre环境下的filebeat.yaml举例,重点关注input和output)
###################### Filebeat Configuration Example #########################

# This file is an example configuration file highlighting only the most common
# options. The filebeat.reference.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/filebeat/index.html

# For more available modules and options, please see the filebeat.reference.yml sample
# configuration file.

# ============================== Filebeat inputs ===============================

filebeat.inputs:
- type: log
  id: summit-cloud-web-server-pre  
  enabled: true
  paths:
  # 注意这里的path是容器内部的路径,也就是之前docker命令中挂载的目录
   - /usr/share/filebeat/summit-cloud-log/summit-cloud-web-server.log
  # 使用json格式进行解析传递
  json.keys_under_root: true
  json.overwrite_keys: true
  fields:
    #指定服务名和运行环境,方便之后kibana和elasticSearch区分索引
    servicename: summit-cloud-web-server
    profiles: pre
- type: log
  id: summit-cloud-user-module-pre
  enabled: true
  paths:
   - /usr/share/filebeat/summit-cloud-log/summit-cloud-user-module.log
  json.keys_under_root: true
  json.overwrite_keys: true
  fields:
    servicename: summit-cloud-user-module
    profiles: pre
- type: log
  id: summit-cloud-recipe-module-pre
  enabled: true
  paths:
   - /usr/share/filebeat/summit-cloud-log/summit-cloud-recipe-module.log
  json.keys_under_root: true
  json.overwrite_keys: true
  fields:
    servicename: summit-cloud-recipe-module
    profiles: pre
- type: log
  id: summit-cloud-device-module-pre
  enabled: true
  paths:
   - /usr/share/filebeat/summit-cloud-log/summit-cloud-device-module.log
  json.keys_under_root: true
  json.overwrite_keys: true
  fields:
    servicename: summit-cloud-device-module
    profiles: pre
- type: log
  id: summit-cloud-api-server-pre
  enabled: true
  paths:
   - /usr/share/filebeat/summit-cloud-log/summit-cloud-api-server.log
  json.keys_under_root: true
  json.overwrite_keys: true
  fields:
    servicename: summit-cloud-api-server
    profiles: pre
# summit-cloud-log案例的项目日志文件扫描
- type: log
  id: summit-cloud-log-test
  enabled: true
  paths:
    - /home/docker_data/summit-log/projectLog/summit-cloud-log/*.log
  json.keys_under_root: true
  json.overwrite_keys: true
  fields:
    servicename: summit-cloud-log
    profiles: test

# ============================== Filebeat modules ==============================

filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml

  # Set to true to enable config reloading
  reload.enabled: false

  # Period on which files under path should be checked for changes
  #reload.period: 10s

# ======================= Elasticsearch template setting =======================

setup.template.settings:
  index.number_of_shards: 1
  #index.codec: best_compression
  #_source.enabled: false


# ================================== General ===================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# output.
#fields:
#  env: staging

# ================================= Dashboards =================================
# These settings control loading the sample dashboards to the Kibana index. Loading
# the dashboards is disabled by default and can be enabled either by setting the
# options here or by using the `setup` command.
#setup.dashboards.enabled: false

# The URL from where to download the dashboards archive. By default this URL
# has a value which is computed based on the Beat name and version. For released
# versions, this URL points to the dashboard archive on the artifacts.elastic.co
# website.
#setup.dashboards.url:

# =================================== Kibana ===================================

# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.
# This requires a Kibana endpoint configuration.
#setup.kibana:

  # Kibana Host
  # Scheme and port can be left out and will be set to the default (http and 5601)
  # In case you specify and additional path, the scheme is required: http://localhost:5601/path
  # IPv6 addresses should always be defined as: https://[2001:db8::1]:5601
  #host: "localhost:5601"

  # Kibana Space ID
  # ID of the Kibana Space into which the dashboards should be loaded. By default,
  # the Default Space will be used.
  #space.id:

# =============================== Elastic Cloud ================================

# These settings simplify using Filebeat with the Elastic Cloud (https://cloud.elastic.co/).

# The cloud.id setting overwrites the `output.elasticsearch.hosts` and
# `setup.kibana.host` options.
# You can find the `cloud.id` in the Elastic Cloud web UI.
#cloud.id:

# The cloud.auth setting overwrites the `output.elasticsearch.username` and
# `output.elasticsearch.password` settings. The format is `<user>:<pass>`.
#cloud.auth:

# ================================== Outputs ===================================

# Configure what output to use when sending the data collected by the beat.

# ---------------------------- Elasticsearch Output ----------------------------
#output.elasticsearch:
  #Array of hosts to connect to.
  #hosts: ["localhost:9200"]

  # Protocol - either `http` (default) or `https`.
  #protocol: "https"

  # Authentication credentials - either API key or username/password.
  #api_key: "id:api_key"
  #username: "elastic"
  #password: "changeme"

# ------------------------------ Logstash Output -------------------------------
# 指定输出到logstash中,指定logstash地址
output.logstash:
  # The Logstash hosts
  hosts: '[output地址]:[logstash映射端口]'

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

# ================================= Processors =================================
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

# ================================== Logging ===================================

# Sets log level. The default log level is info.
# Available log levels are: error, warning, info, debug
#logging.level: debug

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publisher", "service".
#logging.selectors: ["*"]

# ============================= X-Pack Monitoring ==============================
# Filebeat can export internal metrics to a central Elasticsearch monitoring
# cluster.  This requires xpack monitoring to be enabled in Elasticsearch.  The
# reporting is disabled by default.

# Set to true to enable the monitoring reporter.
#monitoring.enabled: false

# Sets the UUID of the Elasticsearch cluster under which monitoring data for this
# Filebeat instance will appear in the Stack Monitoring UI. If output.elasticsearch
# is enabled, the UUID is derived from the Elasticsearch cluster referenced by output.elasticsearch.
#monitoring.cluster_uuid:

# Uncomment to send the metrics to Elasticsearch. Most settings from the
# Elasticsearch output are accepted here as well.
# Note that the settings should point to your Elasticsearch *monitoring* cluster.
# Any setting that is not set is automatically inherited from the Elasticsearch
# output configuration, so if you have the Elasticsearch output configured such
# that it is pointing to your Elasticsearch monitoring cluster, you can simply
# uncomment the following line.
#monitoring.elasticsearch:

# ============================== Instrumentation ===============================

# Instrumentation support for the filebeat.
#instrumentation:
    # Set to true to enable instrumentation of filebeat.
    #enabled: false

    # Environment in which filebeat is running on (eg: staging, production, etc.)
    #environment: ""

    # APM Server hosts to report instrumentation results to.
    #hosts:
    #  - http://localhost:8200

    # API Key for the APM Server(s).
    # If api_key is set then secret_token will be ignored.
    #api_key:

    # Secret token for the APM Server(s).
    #secret_token:


# ================================= Migration ==================================

# This allows to enable 6.7 migration aliases
#migration.6_to_7.enabled: true
logstash.conf
input {
    beats {
    port => 5044  /*这里指定的是logstash容器内部的端口,不是主机映射端口*/
    host => "0.0.0.0"
    client_inactivity_timeout => 36000
  }
}

output {
  elasticsearch { 
        hosts => ["[ip]:[映射端口]"]
        action=>"index"
        /*指定索引名称,其中servicename和profiles是filebeat配置传递过来的数据*/
        index => "%{[fields][servicename]}-%{[fields][profiles]}" /*后面还可以跟上日期,但是由于这样会产生过多索引,故弃用*/
  }
  stdout { codec => rubydebug }
}
kibana.yml
#
# ** THIS IS AN AUTO-GENERATED FILE **
#

# Default Kibana configuration for docker target
server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "http://[ip]:[映射端口]" ]
xpack.monitoring.ui.container.elasticsearch.enabled: true
elasticSearch
#节点启动命令
docker run -d --name $CONTAINER_NAME -p $ELASTICSEARCH_PORT1:9200 \
-p $ELASTICSEARCH_PORT2:9300 -e "discovery.type=single-node" -v /home/sunmi/config/elasticsearch/elasticsearch.yml:\
/usr/share/elasticsearch/config/elasticsearch.yml docker.elastic.co/elasticsearch/elasticsearch:7.10.1

注:项目中还启动了一个elasticSearch-head头节点,端口为9100:9100,访问该端口可以查看当前elasticSearch状态与索引信息

filebeat监听项目日志

  • 之前filebeat容器启动时使用/home/docker_data/summit-cloud/projectLog:/usr/share/filebeat/summit-cloud-log命令挂载目录
    其中/home/docker_data/summit-cloud/projectLog为项目日志文件输出文件夹,/usr/share/filebeat/summit-cloud-log为容器内挂载文件夹
  • 在pre环境下,项目日志文件夹下一共有5个日志文件,因为使用json进行传输解析,请确保日志文件内日志以下面形式输出,如果不是,请确保项目logback的pattern标签配置正确,并重新启动项目:
{"@timestamp":"2023-08-15T08:49:44.975+08:00","dateTime":"2023-08-15 08:49:44,975[yyyy-MM-dd HH:mm:ss .SSS]","level":"WARN","service":"summit-cloud-device-module","thread":"http-nio-20005-exec-10","class":"org.springframework.web.servlet.PageNotFound.noHandlerFound[1282]","message":"No mapping for GET /","stackTrace":""}
  • 检查logstash是否接受并传递日志信息给elasticSearch,检查elasticSearch中是否存在该索引

进入Kibana进行测试

  • 进入到项目kibana界面,若还未创建索引模式则先创建索引模式,通过索引模式匹配日志信息
  • 在Discover处检索信息,选择合适的filiter条件查看是否能成功检索。点开日志信息查看Json板块,检查格式数据是否正确

启动脚本添加定时策略

  • 由于索引模式需要手动创建比较繁琐,并且项目规定超过30天的数据进行永久删除,于是想到使用定时任务完成需求
  • 本次搭建基于两种不同需求实现不同脚本方案: <br/>
    1.当索引有时间后缀时,需要每天某个时刻定时删除过期索引,并添加新增索引的kibana索引模式。如若想实现索引模式高同步性,需要在每次日志回滚后自动往新日志文件中写入一行自定义日志,以更新elasticSearch索引<br/>
    2.当索引没有时间后缀时,不需要每天定时添加索引模式,只需要每天固定检索一次所有索引下数据时间信息,超时的数据进行删除处理
  • 本项目采用第二种方案,第一种方案解决思路可供参考

第一种方案

#!/bin/bash


###操作类型分为add和del
###所有elasticsearch中的索引写入type_log.txt文件中,然后顺序取出并创建kibana索引。如果新增索引,可直接写入type_log.txt并执行脚本即可。对已经存在的索引不会存在影响。


#新增索引add,删除索引del
action=add

# Elasticsearch连接配置
ES_HOST="[ip]"
ES_PORT="[port]"

# Kibana连接配置
KIBANA_HOST="[ip]"
KIBANA_PORT="[port]"


#index_pattern = ""   从type_log.txt文件中读取所有索引的type,每当有新的
#ID = index_pattern
domain_name_file=/home/sunmi/config/kibana/index/type_log.txt
domain_name_file_pre=/home/sunmi/config/kibana/index/type_log_pre.txt

time_field="@timestamp"

#date=`date +%Y-%m`
#当前日期
CURRENT_DATE=$(date "+%Y-%m-%d")



#更新日志
log_file=/home/sunmi/config/kibana/index/update_index_log.txt
log_del_file=/home/sunmi/config/kibana/index/delete/delete_index_log.txt
echo "${CURRENT_DATE}" >> ${log_file}
echo "${CURRENT_DATE}" >> ${log_del_file}


#先做索引删除操作
# 查询所有索引
indices=$(curl -s -XGET "http://${ES_HOST}:${ES_PORT}/_cat/indices?format=json" | jq -r '.[].index')

# 遍历索引
for index_name in $indices; do
  # 使用正则表达式从索引名称中提取日期
  if [[ $index_name =~ [0-9]{4}\.[0-9]{2}\.[0-9]{2} ]]; then
    date_str=${BASH_REMATCH[0]}
    date_str_hyphen=$(echo "$date_str" | sed 's/\./-/g')
    index_date=$(date -d "$date_str_hyphen" +%s)
    current_date=$(date -d "$CURRENT_DATE" +%s)

    # 计算索引日期与当前日期的差异
    difference=$(( ($current_date - $index_date) / (24 * 60 * 60) ))

    # 如果差异大于30天,则删除索引和对应的Kibana索引模式
    if [  $(( $difference > 30 )) -eq 1 ]; then
      # 删除Elasticsearch索引
      curl -XDELETE "http://${ES_HOST}:${ES_PORT}/$index_name"
      echo "Elasticsearch索引 $index_name 已被删除" | tee -a "$log_del_file"

      # 删除Kibana索引模式
      pattern_id=$(curl -s -XGET "http://${KIBANA_HOST}:${KIBANA_PORT}/api/saved_objects/_find?type=index-pattern&search_fields=title&search=$index_name" | jq -r '.saved_objects[0].id')
      if [ ! -z "$pattern_id" ]; then
        curl -XDELETE "http://${KIBANA_HOST}:${KIBANA_PORT}/api/saved_objects/index-pattern/$pattern_id" -H 'kbn-xsrf: true'
        echo "Kibana索引模式 $index_name 已被删除" | tee -a "$log_del_file"
      else 
        echo "未找到与索引 $index_name 相关的Kibana索引模式" | tee -a "$log_del_file"
      fi
      else
        echo "$index_name 索引未超过指定期限,不予删除" | tee -a "$log_del_file"
    fi
  fi
done





#添加索引操作
#请求elasticSearch获取最新的索引数据
curl -XGET "http://${ES_HOST}:${ES_PORT}/_cat/indices?v" > ${domain_name_file_pre}
awk 'NR>1 {print $3}' ${domain_name_file_pre} > ${domain_name_file}

#中间文件,用来存放type_log.txt中有用的行和其行号
middle_file=/home/sunmi/config/kibana/index/middle.txt
grep -E -n '^[[:alnum:].]' "${domain_name_file}" > "${middle_file}"


# 提取索引名称并传递给 Kibana 进行新增操作
domain_name_num=`wc -l ${middle_file} | awk '{print $1}'`
for((i=1;i<=${domain_name_num};i++));do
    domain_name_type=`sed -n "${i}p" ${middle_file}| awk -F':' '{print $2}'`
    ###开始新增新的索引
    if [ $action == "add" ];then
        curl -f -XPOST -H 'Content-Type: application/json' -H 'kbn-xsrf: anything' \
    "http://${KIBANA_HOST}:${KIBANA_PORT}/api/saved_objects/index-pattern/${domain_name_type}" -d"{\"attributes\":{\"title\":\"${domain_name_type}\",\"timeFieldName\":\"@timestamp\"}}" >> ${log_file}
    else
        echo "action errror" >> ${log_file}
        exit 100
    fi

    #对每一条操作都进行日志记录,这样每执行完成后,可过滤日志文件。
    if [ $? -eq 0 ];then
        echo "success ${domain_name_type}" >> ${log_file}
    else
        echo "error ${domain_name_type}" >> ${log_file}
    fi
done


#添加默认索引
#curl -f -XPOST -H 'Content-Type: application/json' -H 'kbn-xsrf: anything' http://localhost:5601/api/kibana/settings/defaultIndex -d "{\"value\":\"logstash-app_www_${date}\"}" >> ${log_file}



#mv -f  /home/sunmi/config/kibana/index/middle.txt /home/sunmi/config/kibana/index/tmp/

第二种方案

#!/bin/bash

# Elasticsearch 连接配置
HOST="[ip]"
PORT="[port]"

domain_index_file=/home/sunmi/config/kibana/index/index.txt
log_file=/home/sunmi/config/kibana/index/index_log.txt
#当前日期
CURRENT_DATE=$(date "+%Y-%m-%d")

echo "${CURRENT_DATE}" >> ${log_file}
echo "${CURRENT_DATE}" > ${domain_index_file}

# 获取所有索引的名称
indices=$(curl -s "${HOST}:${PORT}/_cat/indices?v" | awk '{print $3}' | tail -n +2)
# 遍历每个索引
for index in $indices; do
  echo "${index}" >> ${domain_index_file}
  # 获取索引下的所有数据的 id 和时间戳
  data=$(curl -s "${HOST}:${PORT}/$index/_search" | jq -r '.hits.hits[] | ._id + " " + .["@timestamp"]')

  # 遍历每条数据
  while read -r id timestamp; do
    # 获取当前的时间戳
    now=$(date +%s)
    # 获取数据时间戳
    timestamp=$(date -d "$timestamp" +%s)
    # 计算时间差(单位为天)
    diff=$(( (now - timestamp) / 86400 ))

    # 如果时间差大于 30 天,则删除该数据
    if [ $diff -gt 30 ]; then
      curl -X DELETE "${HOST}:${PORT}/$index/_doc/$id"
      echo "$index : $id 删除成功" | tee -a "$log_file"
    else
      echo "$index : $id 未超过期限,不予删除" | tee -a "$log_file"
    fi
  done <<< "$data"
done
朗读
赞(0)
评论 (0)