在之前一篇文章中关于Logstash安装使用已经演示过读写的功能了,Logstash不单是对数据进行读取和输出的功能,另一个强大的功能就是对数据的清洗,也就是俗称的过滤,那么此篇文章就是介绍Logstash使用Grok来进行对数据的清洗过滤!
grok是用正则表达式来捕获关键数据的,grok是一个十分强大的logstash filter插件,他可以通过正则解析任意文本,将非结构化日志数据弄成结构化和方便车查询的结构,他是目前logstash中解析非结构化日志数据最好的方式。 Grok的语法规则
%{语法:语义}语法是指匹配的模式,例如使用NUMBER模式就可以匹配出数字,IP模式则会匹配出127.0.0.1这样的IP地址 如:输入的内容为127.0.0.1 [07/Feb/2020:16:24:19 +0800] “GET /HTTP /1.1” 430 5039 那么使用%{IP:clientip}匹配模式将获得的结果为:clientip:127.0.0.1 那么使用%{HTTPDATE:timestamp}匹配模式将获得结果为:timestamp:07/Feb/2020:16:24:19 +0800 那么使用%{QS:referrer}匹配模式将获得的结果为:referrer:“GET /HTTP /1.1” 这些匹配模式定义模板是存放在/logstash-6.5.4/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns 这里有一些常用的软件的匹配模式,其中grok-patterns是最基础的匹配模式,查看一下里面是啥 也就是定义了一些正则表达式罢了!
在编写过滤脚本前,先提供一个Grok的在线语法检测工具Grok在线语法检测 这个网站可能比较慢,科学上网的可以用这个,不会科学上网的可以用国内的国内Grok在线语法检测效果一样的
将nginx的日志放入进去最为输入数据 192.168.0.99 - - [30/Sep/2020:10:00:01 +0000] "GET / HTTP/1.1" 200 971 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36" "-" %{IP:clientip}那么更多匹配写法演示如下 注意这里的一些符号问题,如空格、{}、-,这些符号需要转义一下
空格 \ - \- [ \[否则无法匹配,通过上面的匹配演示,我们将输入的内容分成五个部分,即五个字段,将输入内容分割为不同的数据字段,这对于日后解析和查询日志非常有用,这正式使用grok的目的,Lostash默认提供近200个匹配模式,(其实也就是定义好的正则表达式)然我们来使用。
1.grok字段匹配
vi test-grok.conf input{ stdin{} } filter{ grok{ match => ["message","%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"] } } output{ stdout{codec=>"rubydebug"} }2.启动使用test-grok.conf配置文件的Logstash
输入: 192.168.0.99 -- [30/Sep/2020:10:00:01 +0000] "GET / HTTP/1.1" 200 971 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36" "-"3.排除掉不需要的字段
input{ stdin{} } filter{ grok{ match => ["message","%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"] remove_field => ["message"] } } output{ stdout{codec=>"rubydebug"} }4.日期格式化
input{ stdin{} } filter{ grok{ match => ["message","%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"] remove_field => ["message"] } date{ match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"] #match => [ "time","MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601"] %{TIMESTAMP_ISO8601:time} } } output{ stdout{codec=>"rubydebug"} }
那么格式化后存在两个timestamp,一个为@timestamp另一个是timestamp,那么这里的@timestamp是logstash日志收集的时间,timestamp是massage中的时间,且使用date后会将timestamp的值付给@timestamp,那么此时可以将其中一个去除!
5.去除timestamp
input{ stdin{} } filter{ grok{ match => ["message","%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"] remove_field => ["message"] } date{ match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"] } mutate{ remove_field => ["timestamp"] } } output{ stdout{codec=>"rubydebug"} }这里注意:经过grok匹配后的remove_field 中不能直接将timestamp剔除否则在date中无法将timestamp的值赋予给@timestamp只能在时间替换后再将冗余的timestamp字段剔除掉!(呆了@的字段属于元数据字段,这些字段是不能被剔除的吧!) 那么在输出的数据中就没有两个代表时间的字段了! 6.指定字段替换内容 数据修改(mutate)可以使用gsub通过正则表达式替换字段中匹配到的值,只对字符串有效,下面演示gsub 的使用
input{ stdin{} } filter{ grok{ match => ["message","%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"] remove_field => ["message"] } date{ match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"] } mutate{ remove_field => ["timestamp"] gsub => ["host", "master","yyy"] } } output{ stdout{codec=>"rubydebug"} }匹配替换成功! 6.指定规则分割
input{ stdin{} } filter{ grok{ match => ["message","%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"] #remove_field => ["message"] } date{ match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"] } mutate{ remove_field => ["timestamp"] gsub => ["host", "master","yyy"] split => ["message","/"] } } output{ stdout{codec=>"rubydebug"} }7.字段重命名
input{ stdin{} } filter{ grok{ match => ["message","%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"] #remove_field => ["message"] } date{ match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"] } mutate{ remove_field => ["timestamp"] gsub => ["host", "master","yyy"] split => ["new_message","/"] rename => {"message" => "new_message"} } } output{ stdout{codec=>"rubydebug"} }在这里不难看出mutate中的配置是存在一定的优先级的,这里重命名的优先级是高于拆分的 8.IP详细信息分析插件
input{ stdin{} } filter{ grok{ match => ["message","%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"] #remove_field => ["message"] } date{ match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"] } mutate{ remove_field => ["timestamp"] gsub => ["host", "master","yyy"] split => ["new_message","/"] rename => {"message" => "new_message"} } geoip{ source => "clientip" } } output{ stdout{codec=>"rubydebug"} }提取部分geoip产生的数据
input{ stdin{} } filter{ grok{ match => ["message","%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"] #remove_field => ["message"] } date{ match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"] } mutate{ remove_field => ["timestamp"] gsub => ["host", "master","yyy"] split => ["new_message","/"] rename => {"message" => "new_message"} } geoip{ source => "clientip" fields => ["city_name","region_name","country_name","ip","longitude","timezone"] } } output{ stdout{codec=>"rubydebug"} }9.匹配任何内容
input{ stdin{} } filter{ grok{ match => ["message","%{IP:clientip}\ \-\-\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}\ %{GREEDYDATA:oth1}\ %{GREEDYDATA:oth2}\ %{GREEDYDATA:oth3}"] #remove_field => ["message"] } date{ match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"] } mutate{ remove_field => ["timestamp"] gsub => ["host", "master","yyy"] split => ["new_message","/"] rename => {"message" => "new_message"} } geoip{ source => "clientip" fields => ["city_name","region_name","country_name","ip","longitude","timezone"] } } output{ stdout{codec=>"rubydebug"} }这里就任意匹配了其他的一些字段