发布网友 发布时间:2022-04-25 05:51
共3个回答
懂视网 时间:2022-05-02 21:12
ELK版本:6.5.3
[root@test101 ~]# yum -y install mariadb* #这样安装默认是5.5.60版本
在/etc/my.conf文件加入下面三行内容,开启慢查询日志:
slow_query_log
long_query_time = 2 #阙值设置成2秒,只要超过2秒的操作,就会写入slow-log文件
slow_query_log_file = "/var/lib/mysql/test101-slow.log" #设置慢查询日志
重启MySQL
[root@test101 ~]# systemctl restart mysql
1)先创建一个大数据量的文件:
[root@test101 ~]# seq 1 19999999 > /tmp/big
2)登录数据库创建测试数据库:
[root@test101 ~]# mysql
MariaDB [(none)]> create database test101; #创建测试库
MariaDB [(none)]> use test101;
MariaDB [test101]> create table table1(id int(10)not null)engine=innodb; #创建测试表
3)将刚刚创建好的大数据文件/tmp/big导入到MySQL的测试数据库中,生成测试数据表:
MariaDB [test101]> show tables;
+----------------------+
| Tables_in_test101 |
+----------------------+
| table1 |
+-----------------------+
1 row in set (0.00 sec)
MariaDB [test101]> load data local infile ‘/tmp/big‘ into table table1; #注意:local不能少,少了local可能会报错“ERROR 13 (HY000): Can‘t get stat of ‘/tmp/big‘ (Errcode: 2)”
Query OK, 19999999 rows affected (1 min 47.64 sec)
Records: 19999999 Deleted: 0 Skipped: 0 Warnings: 0
MariaDB [test101]>
测试数据导入成功
在数据库查询一条数据:
MariaDB [test101]> select * from table1 where id=258;
+-----+
| id |
+-----+
| 258 |
+-----+
1 row in set (11.76 sec)
MariaDB [test101]>
然后查看慢查询日志文件,已经生成了相应的日志。数据库准备完成:
[root@test101 mysql]# tailf /var/lib/mysql/test101-slow.log
# Time: 181217 15:23:39
# User@Host: root[root] @ localhost []
# Thread_id: 2 Schema: test101 QC_hit: No
# Query_time: 11.758867 Lock_time: 0.000106 Rows_sent: 1 Rows_examined: 19999999
SET timestamp=1545031419;
select * from table1 where id=258;
(安装和基础配置省略)
(安装步骤和基础配置省略)
1)创建logstash采集日志文件:
[root@test103 conf.d]# cat /etc/logstash/conf.d/logstash-syslog.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port => 5044
}
}
output {
elasticsearch {
hosts => ["http://10.0.0.102:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
#user => "elastic"
#password => "changeme"
}
stdout {
codec => rubydebug
}
}
启动101服务器的filebeat,102服务器的elasticsearch、前台启动103服务器的logstash,前台查看日志采集
1)在test101服务器的MySQL运行查询语句(注意,运行查询语句之前要保证logstash前台启动已经完成):
MariaDB [test101]> select * from table1 where id=358;
+-----+
| id |
+-----+
| 358 |
+-----+
1 row in set (13.47 sec)
MariaDB [test101]>
2)查看test101服务器上慢查询日志已经生成:
[root@test101 mysql]# tailf test101-slow.log
# Time: 181218 8:58:30
# User@Host: root[root] @ localhost []
# Thread_id: 3 Schema: test101 QC_hit: No
# Query_time: 13.405630 Lock_time: 0.000271 Rows_sent: 1 Rows_examined: 19999999
SET timestamp=1545094710;
select * from table1 where id=358;
3)再查看logstash,已经采集到相关信息(但是上面的6句日志信息,是被分成了6段messages采集的):
[root@test103 logstash]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-syslog.conf
.......#省略若干行启动信息
[INFO ] 2018-12-18 08:51:48.669 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
#就是出现这个提示的时候才运行上面的mysql查询语句
{
"message" => "# Time: 181218 8:58:30",
"@timestamp" => 2018-12-18T00:58:33.396Z,
"prospector" => {
"type" => "log"
},
"input" => {
"type" => "log"
},
"@version" => "1",
"host" => {
"os" => {
"codename" => "Core",
"platform" => "centos",
"version" => "7 (Core)",
"family" => "redhat"
},
"name" => "test101",
"containerized" => true,
"architecture" => "x86_64",
"id" => "9fcb0c7f28e449aabad922d1ba1f0afc"
},
"beat" => {
"name" => "test101",
"hostname" => "test101",
"version" => "6.5.3"
},
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"offset" => 252,
"source" => "/var/lib/mysql/test101-slow.log"
}
{
"message" => "# User@Host: root[root] @ localhost []",
"@timestamp" => 2018-12-18T00:58:33.398Z,
"prospector" => {
"type" => "log"
},
"input" => {
"type" => "log"
},
"@version" => "1",
"host" => {
"containerized" => true,
"name" => "test101",
"architecture" => "x86_64",
"os" => {
"codename" => "Core",
"platform" => "centos",
"version" => "7 (Core)",
"family" => "redhat"
},
"id" => "9fcb0c7f28e449aabad922d1ba1f0afc"
},
"beat" => {
"name" => "test101",
"version" => "6.5.3",
"hostname" => "test101"
},
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"source" => "/var/lib/mysql/test101-slow.log",
"offset" => 276
}
{
"message" => "# Thread_id: 3 Schema: test101 QC_hit: No",
"@timestamp" => 2018-12-18T00:58:33.398Z,
"prospector" => {
"type" => "log"
},
"input" => {
"type" => "log"
},
"@version" => "1",
"host" => {
"os" => {
"family" => "redhat",
"platform" => "centos",
"version" => "7 (Core)",
"codename" => "Core"
},
"name" => "test101",
"containerized" => true,
"architecture" => "x86_64",
"id" => "9fcb0c7f28e449aabad922d1ba1f0afc"
},
"beat" => {
"name" => "test101",
"hostname" => "test101",
"version" => "6.5.3"
},
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"source" => "/var/lib/mysql/test101-slow.log",
"offset" => 315
}
{
"message" => "# Query_time: 13.405630 Lock_time: 0.000271 Rows_sent: 1 Rows_examined: 19999999",
"@timestamp" => 2018-12-18T00:58:33.398Z,
"prospector" => {
"type" => "log"
},
"input" => {
"type" => "log"
},
"@version" => "1",
"host" => {
"name" => "test101",
"architecture" => "x86_64",
"os" => {
"codename" => "Core",
"platform" => "centos",
"version" => "7 (Core)",
"family" => "redhat"
},
"containerized" => true,
"id" => "9fcb0c7f28e449aabad922d1ba1f0afc"
},
"beat" => {
"name" => "test101",
"hostname" => "test101",
"version" => "6.5.3"
},
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"source" => "/var/lib/mysql/test101-slow.log",
"offset" => 359
}
{
"message" => "SET timestamp=1545094710;",
"@timestamp" => 2018-12-18T00:58:33.398Z,
"prospector" => {
"type" => "log"
},
"input" => {
"type" => "log"
},
"@version" => "1",
"host" => {
"containerized" => true,
"name" => "test101",
"architecture" => "x86_64",
"os" => {
"codename" => "Core",
"platform" => "centos",
"version" => "7 (Core)",
"family" => "redhat"
},
"id" => "9fcb0c7f28e449aabad922d1ba1f0afc"
},
"beat" => {
"name" => "test101",
"hostname" => "test101",
"version" => "6.5.3"
},
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"source" => "/var/lib/mysql/test101-slow.log",
"offset" => 443
}
{
"message" => "select * from table1 where id=358;",
"@timestamp" => 2018-12-18T00:58:33.398Z,
"prospector" => {
"type" => "log"
},
"input" => {
"type" => "log"
},
"@version" => "1",
"host" => {
"architecture" => "x86_64",
"os" => {
"codename" => "Core",
"platform" => "centos",
"version" => "7 (Core)",
"family" => "redhat"
},
"name" => "test101",
"containerized" => true,
"id" => "9fcb0c7f28e449aabad922d1ba1f0afc"
},
"beat" => {
"name" => "test101",
"version" => "6.5.3",
"hostname" => "test101"
},
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"source" => "/var/lib/mysql/test101-slow.log",
"offset" => 469
}
因为慢日志里面,一条信息被分成了6段,不便于日志的分析,因此需要将上面采集到的6个messages优化合并成一个
修改filebeat配置文件,在日志路径后面加入下面三行,然后重启filebeat:
paths:
- /var/lib/mysql/test101-slow.log
#- c:programdataelasticsearchlogs*
#加入下面三行
multiline.pattern: "^# User@Host:"
multiline.negate: true
multiline.match: after
说明:
multiline.pattern:正则表达式,去匹配指定的一行,这里去匹配的以“# User@Host:”开头的那一行;
multiline.negate:true 或 false;默认是false,就是将multiline.pattern匹配到的那一行合并到上一行;如果配置是true,就是将除了multiline.pattern匹的那一行的其他所有行合并到其上一行;
multiline.match:after 或 before,就是指定将要合并到上一行的内容,合并到上一行的末尾或开头;
1)再在MySQL执行一次查询语句:
MariaDB [test101]> select * from table1 where id=368;
+-----+
| id |
+-----+
| 368 |
+-----+
1 row in set (12.54 sec)
MariaDB [test101]>
2)查看看查询日志,已经生成日志:
[root@test101 mysql]# tailf test101-slow.log
# Time: 181218 9:17:42
# User@Host: root[root] @ localhost []
# Thread_id: 3 Schema: test101 QC_hit: No
# Query_time: 12.541603 Lock_time: 0.000493 Rows_sent: 1 Rows_examined: 19999999
SET timestamp=1545095862;
select * from table1 where id=368;
3)查看logstash的实时输出,采集到的日志就变成了一行:
[root@test103 logstash]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-syslog.conf
.......#省略若干行启动信息
{
"message" => # User@Host: root[root] @ localhost []
# Thread_id: 3 Schema: test101 QC_hit: No
# Query_time: 12.541603 Lock_time: 0.000493 Rows_sent: 1 Rows_examined: 19999999
SET timestamp=1545095862;
select * from table1 where id=368;",
#这一行就是在slow-log中的信息,已经被合并成了一行,慢日志的“"# Time: 181218 9:17:42” 单独成行,在前面没有贴出来
"prospector" => {
"type" => "log"
},
"input" => {
"type" => "log"
},
"host" => {
"architecture" => "x86_64",
"os" => {
"codename" => "Core",
"platform" => "centos",
"version" => "7 (Core)",
"family" => "redhat"
},
"containerized" => true,
"name" => "test101",
"id" => "9fcb0c7f28e449aabad922d1ba1f0afc"
},
"log" => {
"flags" => [
[0] "multiline"
]
},
"@timestamp" => 2018-12-18T01:17:45.336Z,
"@version" => "1",
"beat" => {
"name" => "test101",
"hostname" => "test101",
"version" => "6.5.3"
},
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"source" => "/var/lib/mysql/test101-slow.log",
"offset" => 504
}
在实现了上一步骤,即将日志的messages合并之后,就需要采用grok插件将messages格式化成json格式了。
编辑logstash-syslog.conf文件,在input和output中间,加入整个filter模块,配置如下:
[root@test103 conf.d]# cat logstash-syslog.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port => 5044
}
}
filter {
#这一步格式化messages为json格式
grok {
match => [ "message", "(?m)^# User@Host: %{USER:query_user}[[^]]+] @ (?:(?<query_host>S*) )?[(?:%{IP:query_ip})?]s# Thread_id:s+%{NUMBER:thread_id:int}s+Schema: %{USER:schema}s+QC_hit: %{WORD:QC_hit}s*# Query_time: %{NUMBER:query_time:float}s+Lock_time: %{NUMBER:lock_time:float}s+Rows_sent: %{NUMBER:rows_sent:int}s+Rows_examined: %{NUMBER:rows_examined:int}s*(?:use %{DATA:database};s*)?SET timestamp=%{NUMBER:timestamp};s*(?<query>(?<action>w+)s+.*)" ]
}
#这一步是将日志中的时间那一行(如:# Time: 181218 9:17:42)加上一个“drop”的tag
grok {
match => { "message" => "# Time: " }
add_tag => [ "drop" ]
tag_on_failure => []
}
#删除标签中含有drop的行。也就是要删除慢日志里面的“# Time: 181218 9:17:42”这样的内容
if "drop" in [tags] {
drop {}
}
# 时间转换
date {
match => ["mysql.slowlog.timestamp", "UNIX", "YYYY-MM-dd HH:mm:ss"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
ruby {
code => "event.set(‘[@metadata][today]‘, Time.at(event.get(‘@timestamp‘).to_i).localtime.strftime(‘%Y.%m.%d‘))"
}
#删除字段message
mutate {
remove_field => [ "message" ]
}
}
output {
elasticsearch {
hosts => ["http://10.0.0.102:9200"]
#index => "%{tags[0]}"
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
#user => "elastic"
#password => "changeme"
}
stdout {
codec => rubydebug
}
}
1)清除elasticsearch的旧索引,保证索引干净:
[root@test102 filebeat]# curl 10.0.0.102:9200/_cat/indices
green open .kibana_1 udOUvbprSnKWUJISwD0r_g 1 0 8 0 74.4kb 74.4kb
[root@test102 filebeat]#
2)重启filebeat,重新启动logstash前台输出,在mariadb执行查询语句:
MariaDB [test101]> select * from table1 where id=588;
+-----+
| id |
+-----+
| 588 |
+-----+
1 row in set (13.00 sec)
MariaDB [test101]>
3)查看慢查询日志已经生成:
[root@test101 mysql]# tailf test101-slow.log
# Time: 181218 14:39:38
# User@Host: root[root] @ localhost []
# Thread_id: 4 Schema: test101 QC_hit: No
# Query_time: 12.999487 Lock_time: 0.000303 Rows_sent: 1 Rows_examined: 19999999
SET timestamp=1545115178;
select * from table1 where id=588;
4)查看logstash的实时采集输出,已经是json格式了:
[INFO ] 2018-12-18 14:25:02.802 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
{
"QC_hit" => "No",
"rows_sent" => 1,
"action" => "select",
"input" => {
"type" => "log"
},
"offset" => 1032,
"@version" => "1",
"query_host" => "localhost",
"beat" => {
"name" => "test101",
"hostname" => "test101",
"version" => "6.5.3"
},
"query" => "select * from table1 where id=428;",
"@timestamp" => 2018-12-18T06:25:34.017Z,
"prospector" => {
"type" => "log"
},
"lock_time" => 0.000775,
"query_user" => "root",
"log" => {
"flags" => [
[0] "multiline"
]
},
"schema" => "test101",
"timestamp" => "1545114330",
"thread_id" => 4,
"query_time" => 14.155133,
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"host" => {
"name" => "test101",
"id" => "9fcb0c7f28e449aabad922d1ba1f0afc",
"containerized" => true,
"os" => {
"codename" => "Core",
"family" => "redhat",
"version" => "7 (Core)",
"platform" => "centos"
},
"architecture" => "x86_64"
},
"rows_examined" => 19999999,
"source" => "/var/lib/mysql/test101-slow.log"
{
"QC_hit" => "No",
"rows_sent" => 1,
"action" => "select",
"input" => {
"type" => "log"
},
"offset" => 1284,
"@version" => "1",
"query_host" => "localhost",
"beat" => {
"name" => "test101",
"hostname" => "test101",
"version" => "6.5.3"
},
"query" => "select * from table1 where id=588;",
"@timestamp" => 2018-12-18T06:39:44.082Z,
"prospector" => {
"type" => "log"
},
"lock_time" => 0.000303,
"query_user" => "root",
"log" => {
"flags" => [
[0] "multiline"
]
},
"schema" => "test101",
"timestamp" => "1545115178",
"thread_id" => 4,
"query_time" => 12.999487,
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"host" => {
"name" => "test101",
"id" => "9fcb0c7f28e449aabad922d1ba1f0afc",
"containerized" => true,
"os" => {
"codename" => "Core",
"family" => "redhat",
"version" => "7 (Core)",
"platform" => "centos"
},
"architecture" => "x86_64"
},
"rows_examined" => 19999999,
"source" => "/var/lib/mysql/test101-slow.log"
}
再打开kibana界面,建立索引,然后查看kibana的界面,日志OK:
慢查询日志语句:
mysql5.7.20版本慢日志:
# Time: 2018-12-18T08:43:24.828892Z
# User@Host: root[root] @ localhost [] Id: 7
# Query_time: 15.819314 Lock_time: 0.000174 Rows_sent: 1 Rows_examined: 19999999
SET timestamp=1545122604;
select * from table1 where id=258;
grok语法:
match => [ "message", "(?m)^# User@Host: %{USER:query_user}[[^]]+] @ (?:(?<query_host>S*) )?[(?:%{IP:query_ip})?]s+Id:s+%{NUMBER:id:int}s# Query_time: %{NUMBER:query_time:float}s+Lock_time: %{NUMBER:lock_time:float}s+Rows_sent: %{NUMBER:rows_sent:int}s+Rows_examined: %{NUMBER:rows_examined:int}s*(?:use %{DATA:database};s*)?SET timestamp=%{NUMBER:timestamp};s*(?<query>(?<action>w+)s+.*)" ]
logstash采集查询结果:
{
"rows_examined" => 19999999,
"input" => {
"type" => "log"
},
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"query_user" => "root",
"query_time" => 15.461892,
"query_host" => "localhost",
"@version" => "1",
"query" => "select * from table1 where id=258;",
"host" => {
"containerized" => true,
"os" => {
"platform" => "centos",
"version" => "7 (Core)",
"family" => "redhat",
"codename" => "Core"
},
"id" => "9fcb0c7f28e449aabad922d1ba1f0afc",
"name" => "test101",
"architecture" => "x86_64"
},
"source" => "/home/data/test101-mysql5.7.20-slow.log",
"action" => "select",
"lock_time" => 0.000181,
"prospector" => {
"type" => "log"
},
"offset" => 1416,
"log" => {
"flags" => [
[0] "multiline"
]
},
"rows_sent" => 1,
"timestamp" => "1545122934",
"beat" => {
"hostname" => "test101",
"version" => "6.5.3",
"name" => "test101"
},
"@timestamp" => 2018-12-18T08:48:58.531Z,
"id" => 7
}
ELK如何收集并用grok格式化MySQL的slow日志?
标签:转换 4.4 打开 格式 设置 前言 sample action 实现
热心网友 时间:2022-05-02 18:20
在平常处理JAVA系统日志,或者其他日志的时候,我们一般是按照log4j 或者logback 进行日志的记录。 在收集到elasticsearch之后,方便查询或统计。
定义日志格式,首先在收集的时候就要处理好。如,利用logstash的 grok插件进行切割。将日志记录按需求分成数据库字段一样的。然后存储到elasticsearch中,这样在kibana中就可以方便的按照字段统计、查询等了。
具体的操作方式,你可以看下这个链接,看是不是你要的
网页链接
热心网友 时间:2022-05-02 19:38
如果要读取Elasticsearch中的内容,可使用Elasticsearch提供的客户端,或者使用 Spring Data Elasticsearch