系统的可观测性学习-ELK(四)

公言雨

发布于 2022.03.20 18:03 阅读 1679 评论 0

Logstash

什么是Logstash

logstash是一个数据抽取工具,将数据从一个地方转移到另一个地方。如hadoop生态圈的sqoop等。

logstash之所以功能强大和流行,还与其丰富的过滤器插件是分不开的,过滤器提供的并不单单是过滤的功能,还可以对进入过滤器的原始数据进行复杂的逻辑处理,甚至添加独特的事件到后续流程中。

Logstash配置文件有如下三部分组成,其中input、output部分是必须配置,filter部分是可选配置,而filter就是过滤器插件,可以在这部分实现各种日志过滤功能。

Logstash输入插件

配置文件

input {
    #输入插件
}
filter {
    #过滤匹配插件
}
output {
    #输出插件
}

启动操作:

logstash.bat -e 'input{stdin{}} output{stdout{}}'
1

为了好维护,将配置写入文件,启动

logstash.bat -f ../config/test1.conf

读取文件配置

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html

logstash使用一个名为filewatch的ruby gem库来监听文件变化,并通过一个叫.sincedb的数据库文件来记录被监听的日志文件的读取进度(时间戳),这个sincedb数据文件的默认路径在 <path.data>/plugins/inputs/file下面,文件名类似于.sincedb_123456,而<path.data>表示logstash插件存储目录,默认是LOGSTASH_HOME/data。

input {
    file {
        path => ["文件路径"]
        start_position => "beginning"
    }
}
output {
    stdout{
        codec=>rubydebug    
    }
}

path => ["D:/ProgramData/ELK/logstash-7.8.1/Test*.log"] *表示匹配文件名前面有Test log文件

Test.log 注意:一行日志结束为回车符 不回车不解析

172.16.213.132 [09/Nov/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
172.16.213.132 [09/Nov/2019:16:24:20 +0800] "GET / HTTP/1.1" 200 5039
172.16.213.132 [09/Nov/2019:13:24:25 +0800] "PUT / HTTP/1.1" 200 5039
​

 

image-20220313190909457

读取TCP网络数据

input {
  tcp {
    port => "1234"
  }
}
​
filter {
  grok {
    match => { "message" => "%{SYSLOGLINE}" }
  }
}
​
output {
    stdout{
        codec=>rubydebug
    }
}

Logstash过滤器插件(Filter)

https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

Grok

grok是一个十分强大的logstash filter插件,他可以通过正则解析任意文本,将非结构化日志数据弄成结构化和方便查询的结构。他是目前logstash 中解析非结构化日志数据最好的方式。

Grok 的语法规则是:

%{语法: 语义}

例如输入的内容为:

172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
%{IP:clientip} 匹配模式将获得的结果为:clientip: 172.16.213.132
%{HTTPDATE:timestamp} 匹配模式将获得的结果为:timestamp:07/Feb/2018:16:24:19 +0800
%{QS:referrer} 匹配模式将获得的结果为:referrer: "GET / HTTP/1.1"

下面是一个组合匹配模式,它可以获取上面输入的所有内容:

%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}

通过上面这个组合匹配模式,我们将输入的内容分成了五个部分,即五个字段,将输入内容分割为不同的数据字段,这对于日后解析和查询日志数据非常有用,这正是使用grok的目的。

例子 新建配置文件test3.conf:

input{
    stdin{}
}
filter{
    grok{
        match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
    }
}
output{
    stdout{
        codec => "rubydebug"
    }
}

输入内容:

172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039

输出结果:

image-20220313193405903

时间处理(Date)

date插件是对于排序事件和回填旧数据尤其重要,它可以用来转换日志记录中的时间字段,变成LogStash::Timestamp对象,然后转存到@timestamp字段里,这在之前已经做过简单的介绍。 下面是date插件的一个配置示例(这里仅仅列出filter部分):

修改配置文件如下

filter {
    grok {
        match => ["message", "%{HTTPDATE:timestamp}"]
    }
    date {
        match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    }
}

结果:

image-20220313193931499

数据修改(Mutate)

正则表达式替换匹配字段

gsub可以通过正则表达式替换字段中匹配到的值,只对字符串字段有效,下面是一个关于mutate插件中gsub的示例(仅列出filter部分):

filter {
    mutate {
        gsub => ["filed_name_1", "/" , "_"]
    }
}

这个示例表示将filed_name_1字段中所有"/“字符替换为”_"。

分隔符分割字符串为数组

split可以通过指定的分隔符分割字段中的字符串为数组,下面是一个关于mutate插件中split的示例(仅列出filter部分):

filter {
    mutate {
        split => ["filed_name_2", "|"]
    }
}

这个示例表示将filed_name_2(属性名)字段以"|"为区间分隔为数组。

重命名字段

rename可以实现重命名某个字段的功能,下面是一个关于mutate插件中rename的示例(仅列出filter部分):

filter {
    mutate {
        rename => { "old_field" => "new_field" }
    }
}

删除字段

remove_field可以实现删除某个字段的功能,下面是一个关于mutate插件中remove_field的示例(仅列出filter部分):

filter {
    mutate {
        remove_field  =>  ["timestamp"]
    }
}

这个示例表示将字段timestamp删除。

GeoIP 地址查询归类

filter {
    geoip {
        source => "ip_field"
    }
}

综合例子:

input {
    stdin {}
}
filter {
    grok {
    	match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
		remove_field => [ "message" ]
	}
    date {
        match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    }
    mutate {
        convert => [ "response","float" ]
        rename => { "response" => "response_new" }   
        gsub => ["referrer","\"",""]          
        split => ["clientip", "."]
    }
}

output{
    stdout{
        codec => "rubydebug"
    }
}

输出结果:image-20220313195134659

Logstash输出插件

https://www.elastic.co/guide/en/logstash/current/output-plugins.html

output是Logstash的最后阶段,一个事件可以经过多个输出,而一旦所有输出处理完成,整个事件就执行完成。 一些常用的输出包括:

  • file: 表示将日志数据写入磁盘上的文件。

  • Elasticsearch:表示将日志数据发送给Elasticsearch。Elasticsearch可以高效方便和易于查询的保存数据。

1、输出到标准输出(stdout)

output {
    stdout {
        codec => rubydebug
    }
}

2、保存为文件(file)

output {
    file {
        path => "/data/log/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
    }
}

3、输出到elasticsearch

output {
    elasticsearch {
        hosts => ["192.168.1.1:9200","172.16.213.77:9200"]
        index => "logstash-%{+YYYY.MM.dd}"       
    }
}
  • hosts:是一个数组类型的值,后面跟的值是elasticsearch节点的地址与端口,默认端口是9200。可添加多个地址。

  • index:写入elasticsearch的索引的名称,这里可以使用变量。Logstash提供了%{+YYYY.MM.dd}这种写法。在语法解析的时候,看到以+ 号开头的,就会自动认为后面是时间格式,尝试用时间格式来解析后续字符串。这种以天为单位分割的写法,可以很容易的删除老的数据或者搜索指定时间范围内的数据。此外,注意索引名中不能有大写字母。注意:这种格式会使用日志信息的日期进行替换

  • manage_template:用来设置是否开启logstash自动管理模板功能,如果设置为false将关闭自动管理模板功能。如果我们自定义了模板,那么应该设置为false。

  • template_name:这个配置项用来设置在Elasticsearch中模板的名称。

 

综合案例

input {
    file {
        path => ["D:/ES/logstash-7.3.0/nginx.log"]        
        start_position => "beginning"
    }
}

filter {
    grok {
        match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
        remove_field => [ "message" ]
   }
	date {
        match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    }
	mutate {
           rename => { "response" => "response_new" }
           convert => [ "response","float" ]
           gsub => ["referrer","\"",""]
           remove_field => ["timestamp"]
           split => ["clientip", "."]
        }
}

output {
    stdout {
        codec => "rubydebug"
    }

    elasticsearch {
        host => ["localhost:9200"]
        index => "logstash-%{+YYYY.MM.dd}"       
    }
}

 

项目实战

项目一:ELK用于日志分析

需求:集中收集分布式服务的日志

  • 逻辑模块程序随时输出日志

  • logstash收集日志到es

1、逻辑模块程序随时输出日志

前置知识 (有选择看 太细了。。)

SpringBoot—Logback日志,输出到文件以及实时输出到web页面

springboot使用logback日志框架超详细教程

快速使用看这个

SpringBoot系列——Logback日志,输出到文件以及实时输出到web页面 - huanzi-qch - 博客园 (cnblogs.com)

新建logback-spring.xml在resources

image-20220314174802655

<?xml version="1.0" encoding="UTF-8"?>

<configuration>
    <!--定义日志文件的存储地址,使用绝对路径-->
    <property name="LOG_HOME" value="D:/Documents/WorkDocument/logs"/>

    <!-- Console 输出设置 -->
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
            <charset>utf8</charset>
        </encoder>
    </appender>

    <!-- 按照每天生成日志文件 -->
    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!--日志文件输出的文件名-->
            <fileNamePattern>${LOG_HOME}/test-%d{yyyy-MM-dd}.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <!-- 异步输出 -->
    <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
        <!-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 -->
        <discardingThreshold>0</discardingThreshold>
        <!-- 更改默认的队列的深度,该值会影响性能.默认值为256 -->
        <queueSize>512</queueSize>
        <!-- 添加附加的appender,最多只能添加一个 -->
        <appender-ref ref="FILE"/>
    </appender>


    <logger name="org.apache.ibatis.cache.decorators.LoggingCache" level="DEBUG" additivity="false">
        <appender-ref ref="CONSOLE"/>
    </logger>

    <logger name="org.springframework.boot" level="DEBUG"/>

    <root level="info">
        <!--<appender-ref ref="ASYNC"/>-->
        <appender-ref ref="FILE"/>
        <appender-ref ref="CONSOLE"/>
    </root>
</configuration>

 

在application.yaml 添加

logging:
  config: classpath:logback-spring.xml

TestLog.java

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Random;

/**
 * @author gyy
 * @date 2022/3/14
 **/
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestLog {
    private static final Logger LOGGER= LoggerFactory.getLogger(TestLog.class);

    @Test
    public void testLog(){
        Random random =new Random();

        while (true){
            int userid=random.nextInt(10);
            LOGGER.info("userId:{},send:{}",userid,"hello world.I am "+userid);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

2、logstash收集日志到es

logstash config

input {
    file {
        path => ["D:/Documents/WorkDocument/logs/test-*.log"]        
        start_position => "beginning"
    }
}

filter {
    grok {
        match => { "message" =>
 "%{DATA:datetime} \[%{DATA:thread}\] %{DATA:level}  %{DATA:class} - %{GREEDYDATA:1ogger}" }
        remove_field => [ "message" ]
   }
	date {
        match => ["datetime", "dd/MMM/yyyy:HH:mm:ss.SSS"]
    }
	if "_grokparsefailure" in [tags] {
        drop { }
    }
}

output {
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "logstash-%{+YYYY.MM.dd}"       
    }
}

然后先启动 es 在启动logstash 最后k

 

项目二:学成在线站内搜索模块

在高并发需求中 如淘宝用户搜索商品 使用关系数据库mysql之类肯定不科学 大部分应该是将数据库数据放入es中 使用es集群完成并发需求 所以简单学习一下流程

1、mysql创建user_info表

/*
 Navicat Premium Data Transfer

 Source Server         : local_mysql
 Source Server Type    : MySQL
 Source Server Version : 80027
 Source Host           : localhost:3306
 Source Schema         : learning_db

 Target Server Type    : MySQL
 Target Server Version : 80027
 File Encoding         : 65001

 Date: 16/03/2022 13:43:19
*/

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for user_info
-- ----------------------------
DROP TABLE IF EXISTS `user_info`;
CREATE TABLE `user_info`  (
  `user_id` bigint NOT NULL AUTO_INCREMENT,
  `user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '用户名',
  `user_gender` int NOT NULL COMMENT '性别 0 male  1-female',
  `user_id_number` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '身份证号',
  `user_nucleic_acid` int NULL DEFAULT NULL COMMENT '0-正常 1-确诊 2-密接 3-次密接 4-疑似',
  PRIMARY KEY (`user_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = '用户信息表' ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of user_info
-- ----------------------------
INSERT INTO `user_info` VALUES (1, '小米', 1, '123456789X', 0);
INSERT INTO `user_info` VALUES (2, '小明', 0, '123456789X', 1);

SET FOREIGN_KEY_CHECKS = 1;

image-20220317200612201

2、创建索引learning_db

http://localhost:9200/learning_db-user_info

{
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
    "mappings": {
        "properties": {
            "user_id": {
                "type": "keyword"
            },
            "user_name": {
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_smart",
                "type": "text"
            },
            "user_gender": {
                "type": "keyword"
            },
            "user_id_number": {
                "type": "keyword"
            },
            "user_create_time": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || yyyy/MM/dd HH:mm:ss|| yyyy/MM/dd|| strict_date_optional_time|| epoch_millis"
            },
            "user_nucleic_acid_last_time": {
                "type": "date",
                "format":  "yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || yyyy/MM/dd HH:mm:ss|| yyyy/MM/dd|| strict_date_optional_time|| epoch_millis"
            },
            "user_update_time": {
                "type": "date",
                "format":  "yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || yyyy/MM/dd HH:mm:ss|| yyyy/MM/dd|| strict_date_optional_time|| epoch_millis"
            }
        }
    }
}

3、logstash配置mysql.conf

logstash每个执行完成会在/config/.logstash_jdbc_last_run记录执行时间下次以此时间为基准进行增量同步数据到索引库。

input {
  jdbc {
    # 自己本地的驱动位置 也可以用相对
    jdbc_driver_library => "D:/Program Files/maven-repository/mysql/mysql-connector-java/8.0.26/mysql-connector-java-8.0.26.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"    # 8.0以上版本:一定要把serverTimezone=UTC加上
    jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/learning_db?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true&serverTimezone=UTC"
    jdbc_user => "root"
    jdbc_password => "123456"
    # 一分钟一扫
    schedule => "* * * * *"
    #要执行的sql 也可以指定sql文件
    statement => "SELECT * FROM user_info WHERE user_update_time >= :sql_last_value"
    #设置时区
    jdbc_default_timezone => "Asia/Shanghai"
    # sql_last_value储存位置 文件要有 可空 
    last_run_metadata_path => "D:/ProgramData/ELK/logstash-7.8.1/config/.logstash_jdbc_last_run"
  }
}
output {
    elasticsearch {
        # ES的IP地址及端口 可写多个
        hosts => ["127.0.0.1:9200"]
        # 索引名称 可自定义
        index => "learning_db-user_info"
        # 需要关联的数据库中有有一个id字段,对应类型中的_id 
        document_id => "%{user_id}"
    }
    stdout {
        # JSON格式输出
        codec => json_lines
    }
}

 

4、启动

.\logstash.bat -f ..\config\mysql.conf

5、es结果

{
	"took": 809,
	"timed_out": false,
	"_shards": {
		"total": 1,
		"successful": 1,
		"skipped": 0,
		"failed": 0
	},
	"hits": {
		"total": {
			"value": 2,
			"relation": "eq"
		},
		"max_score": 1,
		"hits": [
			{
				"_index": "learning_db-user_info",
				"_type": "_doc",
				"_id": "1",
				"_score": 1,
				"_source": {
					"user_id": 1,
					"user_nucleic_acid": 0,
					"user_gender": 1,
					"user_nucleic_acid_last_time": null,
					"@version": "1",
					"user_create_time": "2022-03-17T16:41:00.000Z",
					"@timestamp": "2022-03-17T11:56:00.913Z",
					"user_name": "小米",
					"user_update_time": "2022-03-17T16:41:01.000Z",
					"user_id_number": "123456789X"
				}
			},
			{
				"_index": "learning_db-user_info",
				"_type": "_doc",
				"_id": "2",
				"_score": 1,
				"_source": {
					"user_id": 2,
					"user_nucleic_acid": 1,
					"user_gender": 0,
					"user_nucleic_acid_last_time": null,
					"@version": "1",
					"user_create_time": "2022-03-17T16:41:03.000Z",
					"@timestamp": "2022-03-17T11:56:00.923Z",
					"user_name": "小明",
					"user_update_time": "2022-03-17T16:41:06.000Z",
					"user_id_number": "123456789X"
				}
			}
		]
	}
}

 

 

 

6、后端代码

 

ObservabilityLearning: ObservabilityLearning 整合框架demo (gitee.com)

 

ElastAlert

是什么

ElastAlert是一个简单的框架,用于从Elasticsearch中的数据中发出异常,尖峰或其他感兴趣的模式的警报。

 

组成

它通过将Elasticsearch与两种类型的组件(规则类型和警报)结合使用。定期查询Elasticsearch,并将数据传递到规则类型,该规则类型确定找到任何匹配项。发生匹配时,它会发出一个或多个警报,这些警报根据不同的类型采取相应的措施。

ElastAlert由一组规则配置,每个规则定义一个查询,一个规则类型和一组警报。

ElastAlert是基于python2开发的一个告警框架,它主要有以下特点:

问题

安装出现问题,而且这个项目3年不维护了,目前放弃,但是是必须找到发邮件之类通知替代品。