logstash配置的话会比较难一点(logstash 高版本自带了本文所使用的所有插件,如果是4.x或者5.x还需要安装插件),
首先要区分使用的是filebeat直连,还是 filebeat + kafka,因为如果经过了kafka这一层以后,收到的结构会变的不一样,需要转换一下,很坑。
至于解析日志的方式—
- 我目前所使用的的思路是,写ruby脚本,分隔我定义好的日志列分隔符,从而把所需的字段一个个取出去。
- 使用gork插件通过正则表达式的方式也可以实现,但是考虑到正则匹配会比较困难,且消耗的cpu要比简单的字符串分隔要高,就放弃了这种想法。
如果使用正则表达式匹配,可以在这个网址中进行尝试http://grokdebug.herokuapp.com/。
filebeat直连logstash下的配置
input {
# stdin{} 命令行输入调试使用
beats {
port => 5044
filter {
ruby {
code =>'
message = event.get("message")
arr = message.split(" | ")
if arr.length >= 4
event.set("thread", arr[0])
event.set("time", arr[1])
event.set("method", arr[2])
event.set("url", arr[3])
event.set("base_params", arr[4])
event.set("level", arr[5])
event.set("java_class", arr[6])
event.set("log", arr[7])
fileds = event.get("fields")
event.set("server_ip",fileds["serverip"] )
event.set("app_name",fileds["appname"] )
event.set("[@metadata][show]", "is")
remove_field => [ "message", "tags", "fileds"]
output {
if[@metadata][show] == "is" {
stdout {}
filebeat + kafka + logstash配置(只增加了一个json格式化,因为从kafka过来的mq是纯字符串)
input {
kafka {
bootstrap_servers=>"xx"
topics=>["xx"]
filter {
json {
source => "message"
ruby {
code =>'
message = event.get("message")
arr = message.split(" | ")
if arr.length >= 4
event.set("thread", arr[0])
event.set("time", arr[1])
event.set("method", arr[2])
event.set("url", arr[3])
event.set("base_params", arr[4])
event.set("level", arr[5])
event.set("java_class", arr[6])
event.set("log", arr[7])
fileds = event.get("fields")
event.set("server_ip",fileds["serverip"] )
event.set("app_name",fileds["appname"] )
event.set("[@metadata][show]", "is")
remove_field => [ "message", "tags", "fileds"]
output {
if[@metadata][show] == "is" {
stdout {}
上面配置中的output都是直接打印在了控制台内,进行调试试用,如果调试完成,再换成合适的output输出,考虑到日志量庞大的问题,我选择的是放到es中进行存储。
由于日志放进ES已经被格式化了,只需要再让前端写一个很简单很简单的页面,来方便查询就可以了。
至于为什么不直接用kibana,kibana的搜索条件实在是太难用了,搜索日志应该尽可能的简单好用为主,例如我们最常用的grep功能,所以只需要放两个搜索框即可。
线上日志结构化收集 写这篇文章的起因是因为在我们的生产环境中,部署节点比较多,机器也比较多,平时看日志只能一台一台登录的去查看,很难受。单纯使用ELK的话,日志内容是非结构化的,就是一大坨文字,非常不直观,因此把我线上结构化日志的流程分享一下1.日志规范定义不管使用什么方式收集,日志的来源是一定要符合我们预想的结构的,否则日志结构化就无从谈起,所以第一步,在我们所有的java应用中首先要规范logback.xml或者其他的日志文件格式,以下是目前我规范出的日志格式 <pa
input{
file {
path =&gt; "/glusterfs/logs/dot/2018/06/28-dot-dot-app-774f45fc85-62q8f.log"
start_position =&gt; "beginning"
output {
stdo...
output插件是经过了input,然后过滤结构化数据之后,接下来我们需要借助output传到我们想传到的地方.output相当于一个输出管道。
将采集数据标准输出到控制台
output {
stdout {
codec => rubydebug
Codec 来自 Coder/decoder
两个单词的首字母缩写,Logst...
logstash
logstash是一个数据分析软件,主要目的是分析log日志。整一套软件可以当作一个MVC模型,logstash是controller层,Elasticsearch是一个model层,kibana是view层。
首先将数据传给logstash,它将数据进行过滤和格式化(转成JSON格式),然后传给Elasticsearch进行存储、建搜索的索引,kibana提供前端的页面再进行搜...
file {
path =&gt; "/usr/local/log_test/*/*/*.log"
start_position =&gt; "beginning"
output {
stdout {
codec =&gt; json
可以看到输出的数据类型
1、 logstash的安装与启动
1)下载logstash,
https://download.elasticsearch.org/logstash/logstash/logstash-1.4.0.tar.gz
2)解压文件logstash-1.4.0.tar.gz
tar –zxvf logstash-1.4.0.tar.gz
cd logstash-1.