Logstach日志文件管理工具的搭建与使用

前言


由于实验室需要搭建一个类似于后台运维的平台,其中涉及到了日志的收集这一项。需要的是从不同的服务器中收集到有用的信息,并统一进行处理展示。这里的日志收集我主要是用Logstash来进行的,下面简单介绍一下logstach的搭建与使用。

一、平台的搭建

先来了解一下Logstash是啥:Logstash是一个开源数据搜集引擎,支持实时数据管道功能。Logstash可以动态整合分散的数据源,根据你的选择对数据标准化。根据不同的高级下游分析和可视化用例对数据进行梳理。尽管Logstash的早期目标是搜集日志,现在它的功能已完全不只于此。任何事件类型都可以加入分析,通过输入、过滤器和输出插件进行转换。与此同时,还提供了很多原生编解码工具简化消息处理。Logstash通过海量数据处理和多种多样的数据格式支持延伸了你对数据的洞察力。

那么就来简单的搭建一下吧

logstash的搭建非常的简单,可以从官网【https://www.elastic.co/cn/products/logstash】上下载logstash的安装包。由于我使用的是服务器,就直接用wget命令了。注意,logstash安装的前提是确保已经安装jdk1.8。

1
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.2.tar.gz

解压到文件夹

1
tar -zxvf logstash-6.4.2.tar.gz

下面可以根据相应的需求进入文件夹的bin目录下进行配置。

二、对配置文件的配置

1.一个demo

首先我们要先了解,Logstash 通过管道进行运作,管道有两个必需的元素,输入和输出,还有一个可选的元素,过滤器。输入插件从数据源获取数据,过滤器插件根据用户指定的数据格式修改数据,输出插件则将数据写入到目的地。

比如我们在bin目录下直接输入,将会启动logstash

1
./logstash -e 'input { stdin { } } output { stdout {} }'

启动logstash,输入hello world~之后会有如下结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[root@HadoopSlave1 bin]# ./logstash -e 'input { stdin { } } output { stdout {} }'
Sending Logstash logs to /logstash-6.4.2/logs which is now configured via log4j2.properties
[2019-06-04T15:52:12,718][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2019-06-04T15:52:13,316][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.4.2"}
[2019-06-04T15:52:15,272][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2019-06-04T15:52:15,404][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x3b59181c run>"}
The stdin plugin is now waiting for input:
[2019-06-04T15:52:15,460][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-06-04T15:52:15,697][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
hello world~
{
"@version" => "1",
"@timestamp" => 2019-06-04T07:52:35.587Z,
"message" => "hello world~",
"host" => "HadoopSlave1"
}

输入的hello world~经过管道和过滤器之后变成了这一段

1
2
3
4
5
6
{
"@version" => "1",
"@timestamp" => 2019-06-04T07:52:35.587Z,
"message" => "hello world~",
"host" => "HadoopSlave1"
}

在此基础上更进一步

2.简单的数据监控

比如要对某个文件进行监控,当文件有变动时就做出相应的反应,这时我们需要在配置文件中配置下面一段代码

1
2
3
4
5
6
7
8
9
10
input{
file{
path => ["/LabDataUpdate/test.log"]
}
}
output {
file{
path => ["/collection.txt"]
}
}

接着,启动logstash

1
nohup ./logstash -f file.conf &

对这段配置代码的解释是这样子的,我需要监控的文件位置位于 /LabDataUpdate/test.log 下,将其读入到logstash的管道中,当其中的日志文件发生变化时,就会将变化的日志输出到 /collection.txt 下。注意,logstash仅仅只会将变化的日志输入到文件下,而不是全部日志。

当然,上述的简单监控仅仅局限于同一台服务器的日志收集,那么我们想要收集不同服务器中的日志到同一台服务器下时该怎么做?

3.用tcp传输

比如说我有两台服务器,其中主服务器也就是存放所有收集来的日志服务器。主服务器称为111,另外一台服务器称为112,我要将112的日志输出到111中。

先对112进行配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 这里是输入
input{
file{
path => ["/LabDataUpdate/count.log", "/LabPatentUpdate/count.log", "/LabProjectUpdate/project_log.log"]
}
}
# 这里是过滤器
filter {

}
# 这里是输出
output {
tcp {
host => "10.1.13.111"
port => 9600
codec => json_lines
}
}

再对111进行配置,同样的,111服务器上也需要安装logstash,两边同时都要启动,从而进行日志的收集监控。111中的配置如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
input{
tcp {
mode => "server"
port => 9600
ssl_enable => false
}
}
output {
file {
path => "/logstash-6.4.2/bin/log/%{host}.log"
codec => line { format => "custom format: %{message}"}
}
stdout {
codec => rubydebug
}
}

这样将会收集112服务器的指定目录的日志到111服务器的output目录下。要注意一点的是,两个配置文件的端口号要统一,否则无法完成收集任务。也能配置多个服务器,只要端口相同,都能收集到111服务器的相应文件下。

3.监控数据库

logstash还能够对数据库表进行监控,当表的数据发生变化时,能对数据库进行检测。也能按照时间间隔对数据库进行一次检测。我这边选择的策略是按照时间间隔来监控数据库其中四张表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
input{
tcp {
mode => "server"
port => 9600
ssl_enable => false
}
jdbc{
# 这个是类型,下面的输出可以根据这个字段选择输出的文件路径
type => "project"
# 这个是要连接的数据库
jdbc_connection_string => "jdbc:mysql://10.1.13.29/techpooldata"
# 需要引入jdbc的jar包
jdbc_driver_library => "/mysql_dirver/mysql-connector-java-5.1.13-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 数据库用户名
jdbc_user => "root"
# 数据库密码
jdbc_password => "tdlabDatabase"

# 监控的时间间隔,这边是一分钟
schedule => "* * * * *"
tracking_column => "id"
use_column_value => "true"
schedule => "* * * * *"
last_run_metadata_path => "./logstash_jdbc_last_run"
# 要监控的数据库信息,这边选择监控数据库的数量变化情况
statement => "select count(*) from project where id > :sql_last_value"
jdbc_default_timezone =>"Asia/Shanghai"
}
jdbc{
type => "patent"
jdbc_connection_string => "jdbc:mysql://10.1.13.29/techpooldata"
jdbc_driver_library => "/mysql_dirver/mysql-connector-java-5.1.13-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_user => "root"
jdbc_password => "tdlabDatabase"

schedule => "* * * * *"
tracking_column => "id"
use_column_value => "true"
schedule => "* * * * *"
last_run_metadata_path => "./logstash_jdbc_last_run"
statement => "select count(*) from patent where id > :sql_last_value"
jdbc_default_timezone =>"Asia/Shanghai"
}
jdbc{
type => "paper"
jdbc_connection_string => "jdbc:mysql://10.1.13.29/techpooldata"
jdbc_driver_library => "/mysql_dirver/mysql-connector-java-5.1.13-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_user => "root"
jdbc_password => "tdlabDatabase"

schedule => "* * * * *"
tracking_column => "id"
use_column_value => "true"
schedule => "* * * * *"
last_run_metadata_path => "./logstash_jdbc_last_run"
statement => "select count(*) from paper where id > :sql_last_value"
jdbc_default_timezone =>"Asia/Shanghai"
}
jdbc{
type => "expert"
jdbc_connection_string => "jdbc:mysql://10.1.13.29/techpooldata"
jdbc_driver_library => "/mysql_dirver/mysql-connector-java-5.1.13-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_user => "root"
jdbc_password => "tdlabDatabase"

schedule => "* * * * *"
tracking_column => "id"
use_column_value => "true"
schedule => "* * * * *"
last_run_metadata_path => "./logstash_jdbc_last_run"
statement => "select count(*) from expert where id > :sql_last_value"
jdbc_default_timezone =>"Asia/Shanghai"
}
}
output {
file {
path => "/logstash-6.4.2/bin/log/%{host}.log"
codec => line { format => "custom format: %{message}"}
}
stdout {
codec => rubydebug
}
# 可以根据类型不同输出到不同的文件夹中,这边是输出到同一个文件中
if [type]=="patent"{
file {
path => "/logstash-6.4.2/bin/log/MysqlOutPut.log"
}
stdout{
# 以json的格式进行输出
codec => json_lines
}
}
if [type]=="paper"{
file {
path => "/logstash-6.4.2/bin/log/MysqlOutPut.log"
}
stdout{
codec => json_lines
}
}
if [type]=="project"{
file {
path => "/logstash-6.4.2/bin/log/MysqlOutPut.log"
}
stdout{
codec => json_lines
}
}
if [type]=="expert"{
file {
path => "/logstash-6.4.2/bin/log/MysqlOutPut.log"
}
stdout{
codec => json_lines
}
}
}

每个字段的解释都在代码中给出。

这样和之前的代码相结合,就能实现logstash最简单的日志监控功能。

三、总结

主要介绍和实现了logstash的简单搭建与监控功能。其一,监控同一台服务器下的日志文件。其二,监控不同服务器下的日志文件,实现日志的收集功能。其三,监控数据库。

本文中还没有介绍过滤器的功能,有时间再后续补上。