Toasobi
ELFK企业级整合使用
本文最后更新于2023年09月07日,已超过485天没有更新。如果文章内容或图片资源失效,请留言反馈,我会及时处理,谢谢!
日志监控模块
日志监控模块教学案例,技术栈为 springboot,elasticSearch,kibana,logstash,filebeat
项目结构
summit-cloud-log
│ .gitkeep
│ pom.xml
│ README.md
│ summit-cloud-log.iml
│
├─logs #项目生成日志文件存放处
│ └─summit-cloud-log
│ error.log
│ info.log
│ warn.log
│
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─summit
│ │ │ │ SummitCloudLogApplication.java #启动类
│ │ │ │
│ │ │ ├─controller
│ │ │ │ logTestController.java #测试接口
│ │ │ │
│ │ │ └─entity
│ │ │ Person.java #测试用实体类
│ │ │
│ │ └─resources
│ │ application.properties
│ │ application.yaml
│ │ logback-spring.xml #logback配置文件
│ │
│ └─test
│ └─java
│ └─com
│ └─summit
│ SummitCloudLogApplicationTests.java
配置讲解
pom.xml
<!--logstash-logback-encoder-->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.0.1</version>
</dependency>
logstash.conf
input {
beats {
port => 5044
host => "0.0.0.0"
}
}
output {
elasticsearch {
hosts => ["43.138.199.12:9200"]
action=>"index"
index => "%{[fields][servicename]}-%{+yyyy.MM.dd}"
}
stdout{
codec =>rubydebug
}
}
filebeat.yaml
案例的filebeat是在localhost上面跑的,扫描的文件夹即是本地文件夹
<br/>在后面summit-cloud项目整合模块会有在服务器部署filebeat指导
# ============================== Filebeat inputs ===============================
filebeat.inputs:
- type: log
enabled: true
paths:
- C:\Users\123\Desktop\summit-cloud-log\logs\summit-cloud-log\*.log
fields:
servicename: summit-cloud-log
multiline:
pattern: '^\{'
negate: true
match: after
timeout: 5s
# ============================== Filebeat modules ==============================
filebeat.config.modules:
# Glob pattern for configuration loading
path: ${path.config}/modules.d/*.yml
# Set to true to enable config reloading
reload.enabled: false
# Period on which files under path should be checked for changes
#reload.period: 10s
# ======================= Elasticsearch template setting =======================
setup.template.settings:
index.number_of_shards: 1
#index.codec: best_compression
#_source.enabled: false
# ================================== Outputs ===================================
# ------------------------------ Logstash Output -------------------------------
output.logstash:
# The Logstash hosts
hosts: ["43.138.199.12:5044"]
# Optional SSL. By default is off.
# List of root certificates for HTTPS server verifications
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Certificate for SSL client authentication
#ssl.certificate: "/etc/pki/client/cert.pem"
# Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"
docker-compose 文件
version: "2.2"
services:
elasticsearch:
image: elasticsearch:7.4.2
container_name: elasticsearch
environment:
- discovery.type=single-node
- ES_JAVA_OPTS=-Xms512m -Xmx1g # 设置最大内存为1g
ports:
- 9200:9200
- 9300:9300
networks:
- elk
kibana:
image: kibana:7.4.2
container_name: kibana
depends_on:
- elasticsearch
environment:
- ELASTICSEARCH_HOSTS=http://43.138.199.12:9200
ports:
- 5601:5601
networks:
- elk
logstash:
image: logstash:7.14.2
container_name: logstash
environment:
- node.name=es01
- LS_JAVA_OPTS=-Xms512m -Xmx1g # 设置最小内存为512MB,最大内存为1g
volumes:
- /home/logstash/config/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
ports:
- 5044:5044
networks:
- elk
networks:
elk:
driver: bridge
application.properties
#指定项目开发环境
spring.profiles.active=dev
#指定logback-spring.xml位置
logging.config=classpath:logback-spring.xml
application.yaml
spring:
application:
name: summit-cloud-log #指定项目名称,后面生成文件等log配置需要用到
logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- 日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL -->
<!-- 变量配置 -->
<!--
格式化输出:
%d表示日期,
%thread表示线程名,
%-5level:级别从左显示5个字符宽度
%msg:日志消息,%n是换行符
%logger{36} 表示 Logger 名字最长36个字符
-->
<springProperty scope="context" name="spring.application.name" source="spring.application.name"/>
<property name="LOG_PATTERN"
value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/>
<!-- 定义日志存储的路径,不要配置相对路径 -->
<property name="FILE_PATH" value="./logs/${spring.application.name}"/>
<!-- 服务名 -->
<property name="FILE_NAME" value="${spring.application.name}"/>
<!-- 默认的控制台日志输出 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!--展示格式 layout -->
<layout class="ch.qos.logback.classic.PatternLayout">
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %msg:日志消息,%n是换行符 -->
<pattern>${LOG_PATTERN}</pattern>
</layout>
</appender>
<!--
这个会打印出所有的info及以下级别的信息,每次大小超过size,
则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档
-->
<appender name="RollingFileInfo" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${FILE_PATH}/info.log</file>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<charset>UTF-8</charset>
<pattern>${LOG_PATTERN}</pattern>
</encoder>
<!--interval属性用来指定多久滚动一次,默认是1天-->
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<FileNamePattern>${FILE_PATH}/%d{yyyy-MM}/summit-cloud-log-%d{yyyy-MM-dd}.%i-INFO.log.gz</FileNamePattern>
<MaxHistory>30</MaxHistory><!--保留近30天的日志 -->
<maxFileSize>20MB</maxFileSize><!--用来指定日志文件的上限大小-->
</rollingPolicy>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFO</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 这个会打印出所有的warn及以上级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档-->
<appender name="RollingFileWarn" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${FILE_PATH}/warn.log</file>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<charset>UTF-8</charset>
<pattern>${LOG_PATTERN}</pattern>
</encoder>
<!--interval属性用来指定多久滚动一次,默认是1天-->
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<FileNamePattern>${FILE_PATH}/%d{yyyy-MM}/summit-cloud-log-%d{yyyy-MM-dd}.%i-WARN.log.gz</FileNamePattern>
<MaxHistory>30</MaxHistory><!--保留近30天的日志 -->
<maxFileSize>20MB</maxFileSize><!--用来指定日志文件的上限大小-->
</rollingPolicy>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>WARN</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 这个会打印出所有的error及以上级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档-->
<appender name="RollingFileError" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${FILE_PATH}/error.log</file>
<!--interval属性用来指定多久滚动一次,默认是1天-->
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<charset>UTF-8</charset>
<pattern>${LOG_PATTERN}</pattern>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<FileNamePattern>${FILE_PATH}/%d{yyyy-MM}/summit-cloud-log-%d{yyyy-MM-dd}.%i-ERROR.log.gz</FileNamePattern>
<MaxHistory>30</MaxHistory><!--保留近30天的日志 -->
<maxFileSize>20MB</maxFileSize><!--用来指定日志文件的上限大小-->
</rollingPolicy>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!--Logger节点用来单独指定日志的形式,比如要为指定包下的class指定不同的日志级别等。-->
<!--然后定义loggers,只有定义了logger并引入的appender,appender才会生效-->
<!-- 多环境配置 -->
<springProfile name="dev">
<!--过滤掉spring和mybatis的一些无用的DEBUG信息-->
<logger name="org.mybatis" level="info" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>
<!--监控系统信息-->
<!--若是additivity设为false,则 子Logger 只会在自己的appender里输出,而不会在 父Logger 的appender里输出。-->
<Logger name="top.fate" level="info" additivity="false">
<appender-ref ref="STDOUT"/>
</Logger>
<root level="info">
<appender-ref ref="STDOUT"/>
<appender-ref ref="RollingFileInfo"/>
<appender-ref ref="RollingFileWarn"/>
<appender-ref ref="RollingFileError"/>
</root>
</springProfile>
<springProfile name="test">
<root level="info">
<appender-ref ref="RollingFileInfo"/>
<appender-ref ref="RollingFileWarn"/>
<appender-ref ref="RollingFileError"/>
</root>
</springProfile>
</configuration>
logback 配置注意事项
1.logback 在启动时:
- 在 classpath 中寻找
logback-test.xml
文件- 如果找不到 logback-test.xml,则在 classpath 中寻找
logback.groovy
文件- 如果找不到 logback.groovy,则在 classpath 中寻找
logback.xml 文件
- 如果上述的文件都找不到,则 logback 会使用 JDK 的 SPI 机制查找
META-INF/services/ch.qos.logback.classic.spi.Configurator
中的 logback 配置实现类,这个实现类必须实现 Configuration 接口,使> 用它的实现来进行配置- 如果上述操作都不成功,logback 就会使用它自带的 BasicConfigurator 来配置,并将日志
输出到 console
2.logback 打印级别
TRACE<DEBUG<INFO<WARN<ERROR
,默认 DEBUG
3.logback root,logger 配置
- root 和 logger 有些类似与路由匹配,root 作为最后一层但是一定会执行匹配匹配项,logger 可以看作一个个路由
- logger 的 name 属性需要指定包名或者类名,表示匹配一个包/类下的所有日志信息
additivity
参数通常用于避免日志事件在多个 logger 中重复输出的情况:
- 当 additivity 属性设置为
true
时,日志事件不仅会被当前 logger 处理,也会被传递给父节点 logger 处理- 当 additivity 属性设置为
false
时,日志> 事件只会被当前 logger 处理,不会被传递给父节点 logger 处理
项目测试
使用以下接口进行 log 打印输出日志,检测本地日志是否输出正确
同时观察服务器上的 logstash 输出是否正确,kibana 上是否可以检索到对应数据
package com.summit.controller;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.summit.common.logInfo;
import com.summit.entity.Person;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
@RestController
@RequestMapping("/t1")
@Slf4j
public class logTestController {
@Value("${spring.application.name}")
private String serviceName; // 从application.properties中获取服务名称
@GetMapping("/logging")
public Object logInfo() {
Person person = new Person();
person.setName("omerta");
person.setAge(20);
person.setAddress("加利福尼亚");
Gson gson = new Gson();
log.info("这里是info测试");
log.info(gson.toJson(person));
//创建返回给前端的数据,需要返回日志文件记录格式
// 创建一个ArrayList类型的列表
ArrayList<logInfo> logInfos = new ArrayList<>();
// 使用System类获取当前时间戳(以毫秒为单位)
long timestamp = System.currentTimeMillis();
// 使用DateUtil类将时间戳格式化为ISO 8601标准的字符串
SimpleDateFormat pdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
// 使用DateUtil类将时间戳格式化为ISO 8601标准的字符串,传入SimpleDateFormat对象作为第二个参数
String timestampStr = DateUtil.format(new Date(timestamp), pdf);
// 使用DateUtil类获取当前日期
Date date = DateUtil.date();
// 使用SimpleDateFormat类将日期格式化为指定格式
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String dateTime = sdf.format(date);
// 获取当前线程的对象
Thread currentThread = Thread.currentThread();
// 获取当前线程的名称
String currentThreadName = currentThread.getName();
logInfo logInfo = new logInfo();
// 设置到LogInfo对象中
logInfo.setTimestamp(timestampStr);
logInfo.setDateTime(dateTime + "[yyyy-MM-dd HH:mm:ss .SSS]");
logInfo.setLevel("INFO");
logInfo.setService(serviceName);
logInfo.setThread(currentThreadName);
logInfo.setClazz("com.summit.controller.logTestController.logInfo[26]");
logInfo.setMessage("这里是info测试");
logInfo.setStackTrace("");
logInfo logInfo1 = new logInfo();
// 设置到LogInfo对象中
logInfo1.setTimestamp(timestampStr);
logInfo1.setDateTime(dateTime + "[yyyy-MM-dd HH:mm:ss .SSS]");
logInfo1.setLevel("INFO");
logInfo1.setService(serviceName);
logInfo1.setThread(currentThreadName);
logInfo1.setClazz("com.summit.controller.logTestController.logInfo[26]");
logInfo1.setMessage(gson.toJson(person));
logInfo1.setStackTrace("");
// 将第一个logInfo对象添加到列表中
logInfos.add(logInfo);
logInfos.add(logInfo1);
return JSONObject.toJSONString(logInfos);
}
@GetMapping("/logError")
public Object logError() {
log.error("这里是error测试");
logInfo logInfo = new logInfo();
logInfo logInfo1 = new logInfo();
// 创建一个ArrayList类型的列表
ArrayList<logInfo> logInfos = new ArrayList<>();
try {
int i = 4 / 0;
} catch (Exception e) {
log.error("An error occurred",e);
//设置报错信息
// 创建一个StringWriter对象
StringWriter sw = new StringWriter();
// 创建一个PrintWriter对象,并传入StringWriter对象作为参数
PrintWriter pw = new PrintWriter(sw);
// 调用异常对象的printStackTrace方法,并传入PrintWriter对象作为参数
e.printStackTrace(pw);
// 将StringWriter对象转换为字符串
String stackTrace = sw.toString();
// 设置到logInfo对象中
logInfo1.setStackTrace(stackTrace);
}
//创建返回给前端的数据,需要返回日志文件记录格式
// 使用System类获取当前时间戳(以毫秒为单位)
long timestamp = System.currentTimeMillis();
// 使用DateUtil类将时间戳格式化为ISO 8601标准的字符串
SimpleDateFormat pdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
// 使用DateUtil类将时间戳格式化为ISO 8601标准的字符串,传入SimpleDateFormat对象作为第二个参数
String timestampStr = DateUtil.format(new Date(timestamp), pdf);
// 使用DateUtil类获取当前日期
Date date = DateUtil.date();
// 使用SimpleDateFormat类将日期格式化为指定格式
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String dateTime = sdf.format(date);
// 获取当前线程的对象
Thread currentThread = Thread.currentThread();
// 获取当前线程的名称
String currentThreadName = currentThread.getName();
// 设置到LogInfo对象中
logInfo.setTimestamp(timestampStr);
logInfo.setDateTime(dateTime + "[yyyy-MM-dd HH:mm:ss .SSS]");
logInfo.setLevel("ERROR");
logInfo.setService(serviceName);
logInfo.setThread(currentThreadName);
logInfo.setClazz("com.summit.controller.logTestController.logError[40]");
logInfo.setMessage("这里是error测试");
logInfo.setStackTrace("");
// 设置到LogInfo对象中
logInfo1.setTimestamp(timestampStr);
logInfo1.setDateTime(dateTime + "[yyyy-MM-dd HH:mm:ss .SSS]");
logInfo1.setLevel("ERROR");
logInfo1.setService(serviceName);
logInfo1.setThread(currentThreadName);
logInfo1.setClazz("com.summit.controller.logTestController.logError[40]");
logInfo1.setMessage("An error occurred");
logInfos.add(logInfo);
logInfos.add(logInfo1);
return JSONObject.toJSONString(logInfos);
}
@GetMapping("/logWarn")
public Object logWarn(){
log.warn("这里是warn测试");
//创建返回给前端的数据,需要返回日志文件记录格式
// 使用System类获取当前时间戳(以毫秒为单位)
long timestamp = System.currentTimeMillis();
// 使用DateUtil类将时间戳格式化为ISO 8601标准的字符串
SimpleDateFormat pdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
// 使用DateUtil类将时间戳格式化为ISO 8601标准的字符串,传入SimpleDateFormat对象作为第二个参数
String timestampStr = DateUtil.format(new Date(timestamp), pdf);
// 使用DateUtil类获取当前日期
Date date = DateUtil.date();
// 使用SimpleDateFormat类将日期格式化为指定格式
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String dateTime = sdf.format(date);
// 获取当前线程的对象
Thread currentThread = Thread.currentThread();
// 获取当前线程的名称
String currentThreadName = currentThread.getName();
logInfo logInfo = new logInfo();
// 设置到LogInfo对象中
logInfo.setTimestamp(timestampStr);
logInfo.setDateTime(dateTime + "[yyyy-MM-dd HH:mm:ss .SSS]");
logInfo.setLevel("WARN");
logInfo.setService(serviceName);
logInfo.setThread(currentThreadName);
logInfo.setClazz("com.summit.controller.logTestController.logWarn[20]");
logInfo.setMessage("这里是warn测试");
logInfo.setStackTrace("");
return JSONObject.toJSONString(logInfo);
}
}
cloud项目整合
日志传输流程
*.log --> filebeat --> logstash --> elasticSearch保存 --> kibana显示
搭建流程
1.ELFK各项单独部署 <br/>
2.修改项目logback配置文件 <br/>
3.修改ELFK配置文件,ELFK互联 <br/>
4.filebeat监听项目日志 <br/>
5.进入Kibana进行测试 <br/>
6.启动脚本添加定时策略 <br/>
ELFK部署
- ELFK采用docker进行部署,项目中的elasticSearch,kibana,logstash均使用脚本部署
- filebeat采用docker命令部署,具体命令如下,部署完成后使用
docker ps
查看容器是否运行成功:## 前两个文件夹挂载是容器配置挂载,不做具体修改 ## 第三个文件夹挂载为summit后台项目日志文件夹挂载(注意要使filebeat监听扫描本机上文件,必须挂载到容器内部,然后使用容器内部路径进行输入) ## 第四个文件夹为summit-cloud-log日志案例的日志文件夹目录挂载(命令移植执行可以不需要该行挂载) docker run -d --name dev_filebeat --restart=always \ -v /home/sunmi/config/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml \ -v /home/sunmi/config/filebeat/log:/var/log/messages \ -v /home/docker_data/summit-cloud/projectLog:/usr/share/filebeat/summit-cloud-log \ -v /home/docker_data/summit-log/projectLog/summit-cloud-log:/home/docker_data/summit-log/projectLog/summit-cloud-log \ docker.elastic.co/beats/filebeat:7.10.1
修改项目logback配置文件
logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<springProperty scope="context" name="spring.application.name" source="spring.application.name"/>
<property name="logging.file.path" value="./logs"/>
<property name="logging.history.file.path" value="./logs/history" />
<property name="logback.application.name" value="${spring.application.name}"/>
<!-- 默认的控制台日志输出 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!--展示格式 layout -->
<layout class="ch.qos.logback.classic.PatternLayout">
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %msg:日志消息,%n是换行符 -->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
</pattern>
</layout>
</appender>
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file><!-- 指定文件位置及名字 -->
${logging.file.path}/${logback.application.name}.log
</file>
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>GMT+8</timeZone>
</timestamp>
<!--指定文件中日志记录格式为json格式,使用json格式传递在后续解析等方面具有优势-->
<pattern>
<pattern>
{
"dateTime": "%d[yyyy-MM-dd HH:mm:ss .SSS]",
"level":"%level",
"service":"${logback.application.name}",
"thread":"%thread",
"class":"%logger.%method[%line]",
"message" :"%message",
"stackTrace":"%exception"
}
</pattern>
</pattern>
</providers>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 滚动策略,按照时间滚动 --><!--日志文件输出的文件名:按天回滚 daily -->
<FileNamePattern>${logging.history.file.path}/${logback.application.name}.%d{yyyy-MM-dd}.log
</FileNamePattern><!-- 指定回滚后旧文件位置及名字 -->
<MaxHistory>30</MaxHistory><!--保留近30天的日志 -->
<totalSizeCap>300MB</totalSizeCap><!--指定了在执行回滚操作时,回滚日志文件的累计大小不应超过300MB,超过则清理旧的回滚日志 -->
</rollingPolicy>
</appender>
<!-- 多环境配置 -->
<springProfile name="dev">
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</springProfile>
<springProfile name="test">
<root level="INFO">
<appender-ref ref="FILE"/>
</root>
</springProfile>
<springProfile name="pre">
<root level="INFO">
<appender-ref ref="FILE"/>
</root>
</springProfile>
<springProfile name="prod">
<root level="INFO">
<appender-ref ref="FILE"/>
</root>
</springProfile>
</configuration>
注:所有需要日志监控的微服务模块统一采用该日志文件配置,其中服务名称引用各个模块application配置文件中指定配置,其他基本通用
修改ELFK配置文件,ELFK互联
filebeat.yml (这里使用pre环境下的filebeat.yaml举例,重点关注input和output)
###################### Filebeat Configuration Example #########################
# This file is an example configuration file highlighting only the most common
# options. The filebeat.reference.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/filebeat/index.html
# For more available modules and options, please see the filebeat.reference.yml sample
# configuration file.
# ============================== Filebeat inputs ===============================
filebeat.inputs:
- type: log
id: summit-cloud-web-server-pre
enabled: true
paths:
# 注意这里的path是容器内部的路径,也就是之前docker命令中挂载的目录
- /usr/share/filebeat/summit-cloud-log/summit-cloud-web-server.log
# 使用json格式进行解析传递
json.keys_under_root: true
json.overwrite_keys: true
fields:
#指定服务名和运行环境,方便之后kibana和elasticSearch区分索引
servicename: summit-cloud-web-server
profiles: pre
- type: log
id: summit-cloud-user-module-pre
enabled: true
paths:
- /usr/share/filebeat/summit-cloud-log/summit-cloud-user-module.log
json.keys_under_root: true
json.overwrite_keys: true
fields:
servicename: summit-cloud-user-module
profiles: pre
- type: log
id: summit-cloud-recipe-module-pre
enabled: true
paths:
- /usr/share/filebeat/summit-cloud-log/summit-cloud-recipe-module.log
json.keys_under_root: true
json.overwrite_keys: true
fields:
servicename: summit-cloud-recipe-module
profiles: pre
- type: log
id: summit-cloud-device-module-pre
enabled: true
paths:
- /usr/share/filebeat/summit-cloud-log/summit-cloud-device-module.log
json.keys_under_root: true
json.overwrite_keys: true
fields:
servicename: summit-cloud-device-module
profiles: pre
- type: log
id: summit-cloud-api-server-pre
enabled: true
paths:
- /usr/share/filebeat/summit-cloud-log/summit-cloud-api-server.log
json.keys_under_root: true
json.overwrite_keys: true
fields:
servicename: summit-cloud-api-server
profiles: pre
# summit-cloud-log案例的项目日志文件扫描
- type: log
id: summit-cloud-log-test
enabled: true
paths:
- /home/docker_data/summit-log/projectLog/summit-cloud-log/*.log
json.keys_under_root: true
json.overwrite_keys: true
fields:
servicename: summit-cloud-log
profiles: test
# ============================== Filebeat modules ==============================
filebeat.config.modules:
# Glob pattern for configuration loading
path: ${path.config}/modules.d/*.yml
# Set to true to enable config reloading
reload.enabled: false
# Period on which files under path should be checked for changes
#reload.period: 10s
# ======================= Elasticsearch template setting =======================
setup.template.settings:
index.number_of_shards: 1
#index.codec: best_compression
#_source.enabled: false
# ================================== General ===================================
# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:
# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]
# Optional fields that you can specify to add additional information to the
# output.
#fields:
# env: staging
# ================================= Dashboards =================================
# These settings control loading the sample dashboards to the Kibana index. Loading
# the dashboards is disabled by default and can be enabled either by setting the
# options here or by using the `setup` command.
#setup.dashboards.enabled: false
# The URL from where to download the dashboards archive. By default this URL
# has a value which is computed based on the Beat name and version. For released
# versions, this URL points to the dashboard archive on the artifacts.elastic.co
# website.
#setup.dashboards.url:
# =================================== Kibana ===================================
# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.
# This requires a Kibana endpoint configuration.
#setup.kibana:
# Kibana Host
# Scheme and port can be left out and will be set to the default (http and 5601)
# In case you specify and additional path, the scheme is required: http://localhost:5601/path
# IPv6 addresses should always be defined as: https://[2001:db8::1]:5601
#host: "localhost:5601"
# Kibana Space ID
# ID of the Kibana Space into which the dashboards should be loaded. By default,
# the Default Space will be used.
#space.id:
# =============================== Elastic Cloud ================================
# These settings simplify using Filebeat with the Elastic Cloud (https://cloud.elastic.co/).
# The cloud.id setting overwrites the `output.elasticsearch.hosts` and
# `setup.kibana.host` options.
# You can find the `cloud.id` in the Elastic Cloud web UI.
#cloud.id:
# The cloud.auth setting overwrites the `output.elasticsearch.username` and
# `output.elasticsearch.password` settings. The format is `<user>:<pass>`.
#cloud.auth:
# ================================== Outputs ===================================
# Configure what output to use when sending the data collected by the beat.
# ---------------------------- Elasticsearch Output ----------------------------
#output.elasticsearch:
#Array of hosts to connect to.
#hosts: ["localhost:9200"]
# Protocol - either `http` (default) or `https`.
#protocol: "https"
# Authentication credentials - either API key or username/password.
#api_key: "id:api_key"
#username: "elastic"
#password: "changeme"
# ------------------------------ Logstash Output -------------------------------
# 指定输出到logstash中,指定logstash地址
output.logstash:
# The Logstash hosts
hosts: '[output地址]:[logstash映射端口]'
# Optional SSL. By default is off.
# List of root certificates for HTTPS server verifications
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Certificate for SSL client authentication
#ssl.certificate: "/etc/pki/client/cert.pem"
# Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"
# ================================= Processors =================================
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
# ================================== Logging ===================================
# Sets log level. The default log level is info.
# Available log levels are: error, warning, info, debug
#logging.level: debug
# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publisher", "service".
#logging.selectors: ["*"]
# ============================= X-Pack Monitoring ==============================
# Filebeat can export internal metrics to a central Elasticsearch monitoring
# cluster. This requires xpack monitoring to be enabled in Elasticsearch. The
# reporting is disabled by default.
# Set to true to enable the monitoring reporter.
#monitoring.enabled: false
# Sets the UUID of the Elasticsearch cluster under which monitoring data for this
# Filebeat instance will appear in the Stack Monitoring UI. If output.elasticsearch
# is enabled, the UUID is derived from the Elasticsearch cluster referenced by output.elasticsearch.
#monitoring.cluster_uuid:
# Uncomment to send the metrics to Elasticsearch. Most settings from the
# Elasticsearch output are accepted here as well.
# Note that the settings should point to your Elasticsearch *monitoring* cluster.
# Any setting that is not set is automatically inherited from the Elasticsearch
# output configuration, so if you have the Elasticsearch output configured such
# that it is pointing to your Elasticsearch monitoring cluster, you can simply
# uncomment the following line.
#monitoring.elasticsearch:
# ============================== Instrumentation ===============================
# Instrumentation support for the filebeat.
#instrumentation:
# Set to true to enable instrumentation of filebeat.
#enabled: false
# Environment in which filebeat is running on (eg: staging, production, etc.)
#environment: ""
# APM Server hosts to report instrumentation results to.
#hosts:
# - http://localhost:8200
# API Key for the APM Server(s).
# If api_key is set then secret_token will be ignored.
#api_key:
# Secret token for the APM Server(s).
#secret_token:
# ================================= Migration ==================================
# This allows to enable 6.7 migration aliases
#migration.6_to_7.enabled: true
logstash.conf
input {
beats {
port => 5044 /*这里指定的是logstash容器内部的端口,不是主机映射端口*/
host => "0.0.0.0"
client_inactivity_timeout => 36000
}
}
output {
elasticsearch {
hosts => ["[ip]:[映射端口]"]
action=>"index"
/*指定索引名称,其中servicename和profiles是filebeat配置传递过来的数据*/
index => "%{[fields][servicename]}-%{[fields][profiles]}" /*后面还可以跟上日期,但是由于这样会产生过多索引,故弃用*/
}
stdout { codec => rubydebug }
}
kibana.yml
#
# ** THIS IS AN AUTO-GENERATED FILE **
#
# Default Kibana configuration for docker target
server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "http://[ip]:[映射端口]" ]
xpack.monitoring.ui.container.elasticsearch.enabled: true
elasticSearch
#节点启动命令
docker run -d --name $CONTAINER_NAME -p $ELASTICSEARCH_PORT1:9200 \
-p $ELASTICSEARCH_PORT2:9300 -e "discovery.type=single-node" -v /home/sunmi/config/elasticsearch/elasticsearch.yml:\
/usr/share/elasticsearch/config/elasticsearch.yml docker.elastic.co/elasticsearch/elasticsearch:7.10.1
注:项目中还启动了一个elasticSearch-head头节点,端口为9100:9100,访问该端口可以查看当前elasticSearch状态与索引信息
filebeat监听项目日志
- 之前filebeat容器启动时使用
/home/docker_data/summit-cloud/projectLog:/usr/share/filebeat/summit-cloud-log
命令挂载目录
其中/home/docker_data/summit-cloud/projectLog
为项目日志文件输出文件夹,/usr/share/filebeat/summit-cloud-log
为容器内挂载文件夹- 在pre环境下,项目日志文件夹下一共有5个日志文件,因为使用json进行传输解析,请确保日志文件内日志以下面形式输出,如果不是,请确保项目logback的pattern标签配置正确,并重新启动项目:
{"@timestamp":"2023-08-15T08:49:44.975+08:00","dateTime":"2023-08-15 08:49:44,975[yyyy-MM-dd HH:mm:ss .SSS]","level":"WARN","service":"summit-cloud-device-module","thread":"http-nio-20005-exec-10","class":"org.springframework.web.servlet.PageNotFound.noHandlerFound[1282]","message":"No mapping for GET /","stackTrace":""}
- 检查logstash是否接受并传递日志信息给elasticSearch,检查elasticSearch中是否存在该索引
进入Kibana进行测试
- 进入到项目kibana界面,若还未创建索引模式则先创建索引模式,通过索引模式匹配日志信息
- 在Discover处检索信息,选择合适的filiter条件查看是否能成功检索。点开日志信息查看Json板块,检查格式数据是否正确
启动脚本添加定时策略
- 由于索引模式需要手动创建比较繁琐,并且项目规定超过30天的数据进行永久删除,于是想到使用定时任务完成需求
- 本次搭建基于两种不同需求实现不同脚本方案: <br/>
1.
当索引有时间后缀时,需要每天某个时刻定时删除过期索引,并添加新增索引的kibana索引模式。如若想实现索引模式高同步性,需要在每次日志回滚后自动往新日志文件中写入一行自定义日志,以更新elasticSearch索引<br/>
2.
当索引没有时间后缀时,不需要每天定时添加索引模式,只需要每天固定检索一次所有索引下数据时间信息,超时的数据进行删除处理- 本项目采用第二种方案,第一种方案解决思路可供参考
第一种方案
#!/bin/bash
###操作类型分为add和del
###所有elasticsearch中的索引写入type_log.txt文件中,然后顺序取出并创建kibana索引。如果新增索引,可直接写入type_log.txt并执行脚本即可。对已经存在的索引不会存在影响。
#新增索引add,删除索引del
action=add
# Elasticsearch连接配置
ES_HOST="[ip]"
ES_PORT="[port]"
# Kibana连接配置
KIBANA_HOST="[ip]"
KIBANA_PORT="[port]"
#index_pattern = "" 从type_log.txt文件中读取所有索引的type,每当有新的
#ID = index_pattern
domain_name_file=/home/sunmi/config/kibana/index/type_log.txt
domain_name_file_pre=/home/sunmi/config/kibana/index/type_log_pre.txt
time_field="@timestamp"
#date=`date +%Y-%m`
#当前日期
CURRENT_DATE=$(date "+%Y-%m-%d")
#更新日志
log_file=/home/sunmi/config/kibana/index/update_index_log.txt
log_del_file=/home/sunmi/config/kibana/index/delete/delete_index_log.txt
echo "${CURRENT_DATE}" >> ${log_file}
echo "${CURRENT_DATE}" >> ${log_del_file}
#先做索引删除操作
# 查询所有索引
indices=$(curl -s -XGET "http://${ES_HOST}:${ES_PORT}/_cat/indices?format=json" | jq -r '.[].index')
# 遍历索引
for index_name in $indices; do
# 使用正则表达式从索引名称中提取日期
if [[ $index_name =~ [0-9]{4}\.[0-9]{2}\.[0-9]{2} ]]; then
date_str=${BASH_REMATCH[0]}
date_str_hyphen=$(echo "$date_str" | sed 's/\./-/g')
index_date=$(date -d "$date_str_hyphen" +%s)
current_date=$(date -d "$CURRENT_DATE" +%s)
# 计算索引日期与当前日期的差异
difference=$(( ($current_date - $index_date) / (24 * 60 * 60) ))
# 如果差异大于30天,则删除索引和对应的Kibana索引模式
if [ $(( $difference > 30 )) -eq 1 ]; then
# 删除Elasticsearch索引
curl -XDELETE "http://${ES_HOST}:${ES_PORT}/$index_name"
echo "Elasticsearch索引 $index_name 已被删除" | tee -a "$log_del_file"
# 删除Kibana索引模式
pattern_id=$(curl -s -XGET "http://${KIBANA_HOST}:${KIBANA_PORT}/api/saved_objects/_find?type=index-pattern&search_fields=title&search=$index_name" | jq -r '.saved_objects[0].id')
if [ ! -z "$pattern_id" ]; then
curl -XDELETE "http://${KIBANA_HOST}:${KIBANA_PORT}/api/saved_objects/index-pattern/$pattern_id" -H 'kbn-xsrf: true'
echo "Kibana索引模式 $index_name 已被删除" | tee -a "$log_del_file"
else
echo "未找到与索引 $index_name 相关的Kibana索引模式" | tee -a "$log_del_file"
fi
else
echo "$index_name 索引未超过指定期限,不予删除" | tee -a "$log_del_file"
fi
fi
done
#添加索引操作
#请求elasticSearch获取最新的索引数据
curl -XGET "http://${ES_HOST}:${ES_PORT}/_cat/indices?v" > ${domain_name_file_pre}
awk 'NR>1 {print $3}' ${domain_name_file_pre} > ${domain_name_file}
#中间文件,用来存放type_log.txt中有用的行和其行号
middle_file=/home/sunmi/config/kibana/index/middle.txt
grep -E -n '^[[:alnum:].]' "${domain_name_file}" > "${middle_file}"
# 提取索引名称并传递给 Kibana 进行新增操作
domain_name_num=`wc -l ${middle_file} | awk '{print $1}'`
for((i=1;i<=${domain_name_num};i++));do
domain_name_type=`sed -n "${i}p" ${middle_file}| awk -F':' '{print $2}'`
###开始新增新的索引
if [ $action == "add" ];then
curl -f -XPOST -H 'Content-Type: application/json' -H 'kbn-xsrf: anything' \
"http://${KIBANA_HOST}:${KIBANA_PORT}/api/saved_objects/index-pattern/${domain_name_type}" -d"{\"attributes\":{\"title\":\"${domain_name_type}\",\"timeFieldName\":\"@timestamp\"}}" >> ${log_file}
else
echo "action errror" >> ${log_file}
exit 100
fi
#对每一条操作都进行日志记录,这样每执行完成后,可过滤日志文件。
if [ $? -eq 0 ];then
echo "success ${domain_name_type}" >> ${log_file}
else
echo "error ${domain_name_type}" >> ${log_file}
fi
done
#添加默认索引
#curl -f -XPOST -H 'Content-Type: application/json' -H 'kbn-xsrf: anything' http://localhost:5601/api/kibana/settings/defaultIndex -d "{\"value\":\"logstash-app_www_${date}\"}" >> ${log_file}
#mv -f /home/sunmi/config/kibana/index/middle.txt /home/sunmi/config/kibana/index/tmp/
第二种方案
#!/bin/bash
# Elasticsearch 连接配置
HOST="[ip]"
PORT="[port]"
domain_index_file=/home/sunmi/config/kibana/index/index.txt
log_file=/home/sunmi/config/kibana/index/index_log.txt
#当前日期
CURRENT_DATE=$(date "+%Y-%m-%d")
echo "${CURRENT_DATE}" >> ${log_file}
echo "${CURRENT_DATE}" > ${domain_index_file}
# 获取所有索引的名称
indices=$(curl -s "${HOST}:${PORT}/_cat/indices?v" | awk '{print $3}' | tail -n +2)
# 遍历每个索引
for index in $indices; do
echo "${index}" >> ${domain_index_file}
# 获取索引下的所有数据的 id 和时间戳
data=$(curl -s "${HOST}:${PORT}/$index/_search" | jq -r '.hits.hits[] | ._id + " " + .["@timestamp"]')
# 遍历每条数据
while read -r id timestamp; do
# 获取当前的时间戳
now=$(date +%s)
# 获取数据时间戳
timestamp=$(date -d "$timestamp" +%s)
# 计算时间差(单位为天)
diff=$(( (now - timestamp) / 86400 ))
# 如果时间差大于 30 天,则删除该数据
if [ $diff -gt 30 ]; then
curl -X DELETE "${HOST}:${PORT}/$index/_doc/$id"
echo "$index : $id 删除成功" | tee -a "$log_file"
else
echo "$index : $id 未超过期限,不予删除" | tee -a "$log_file"
fi
done <<< "$data"
done