用logstash收集nginx日志,并输出至node.js API

最近在做日志系统,一直在研究logstash+elasticsearch的实现方案。

logstash是一个文本内容的收集工具,elasticsearch是一个检索引擎,理想状态下,logstash收集并且整理好的内容,发送到elasticsearch的索引内,可以实现全文检索

logstash,和elasticsearch的配置相对都简单,你只要在本机安装了java环境后,再去https://www.elastic.co/下载logstash和elasticsearch,然后对他们进行解压,可以说,到这一步这个事情基本就算做完了。

不过要想让她按照你想要的工作开始,还是要进行一点点简单的配置,在logstash的bin目录下创建logstash.conf文件,然后简单配置一下你刚才创建的logstash.conf,文件,文件内容基本如下

input是要采集的日志文件的路径,我主要要分析nginx

input {
    file {
        path => ["access.log"]
        type => "nginx-access"
        start_position => "beginning"
    }

    file {
        path => ["error.log"]
        type => "nginx-error"
        start_position => "beginning"
    }
    stdin{
    }
}

filter是用于筛选数据的,可以用grok来匹配出你要的字段格式

filter {  
    if [type] == "nginx-access" {
        grok{
            match =>    ["message","%{IPORHOST:client_ip}\s{1,}\-\s\-\s\[%{HTTPDATE:timestamp}\]\s{1,}\"(?:%{WORD:verb}\s{1,}%{NOTSPACE:request}(?:\s{1,}HTTP/%{NUMBER:http_version})?|-)\" %{NUMBER:response}\s{1,}(?:%{NUMBER:bytes}|-)\s{1,}%{QS:referrer}\s{1,}%{QS:agent}"]
        }

        ruby{
            code => "event.set('logdateunix',event.get('logdate').to_i)"
        }
    } else if [type] == "nginx-error" { 
        grok {
        match => [
            "message", "(?<time>\d{4}/\d{2}/\d{2}\s{1,}\d{2}:\d{2}:\d{2})\s{1,}\[%{DATA:err_severity}\]\s{1,}(%{NUMBER:pid:int}#%{NUMBER}:\s{1,}\*%{NUMBER}|\*%{NUMBER}) %{DATA:err_message}(?:,\s{1,}client:\s{1,}(?<client_ip>%{IP}|%{HOSTNAME}))(?:,\s{1,}server:\s{1,}%{IPORHOST:server})(?:, request: %{QS:request})?(?:, host: %{QS:client_ip})?(?:, referrer: \"%{URI:referrer})?"
            ]
        }
        ruby{
            code => "event.set('logdateunix',event.get('logdate').to_i)"
        }
    } 
}

output是要将日志发送到什么地方,elasticsearch是配置elasticsearch的地址和索引

output {
    elasticsearch { 
                hosts => ["localhost:9200"] 
                index => "logstash-%{+YYYY.MM.dd}"
            }
    stdout {
        codec => rubydebug
    }
}

然后启动logstash

logstash -f logstash.conf

启动elasticsearch

elasticsearch.bat

基本就可以了,然后在用DSL查数据,这就是基本流程。我用的是head

不过最终考虑到成本问题以及现实的需求,还是放弃了elasticsearch,只是用logstash最为日志收集工具,elasticsearch就不适用了,研究再三决定在logstash输出的时候,让它输出到一个远程的API,再由这个API,存入一个数据库中,然后每天定时分析这个数据库中前一天的数据,然后分析出想要的数据后,将前一天的数据删除,将分析后的结果在存入另一个数据库中。

研究了logstash的输出插件,发现有http输出,可以选择post或者是其他的提交方式,这样就满足了使用一个API作为数据临时存储的要求

output{
    http {
        content_type=>"application/x-www-form-urlencoded"
        url => "http://127.0.0.1:6688/"
        http_method => "post"
        format => "form"
    }
}
  1. content_type: 值类型是string,默认没有设置该参数,如果没有特别指明,json格式是application/json,form格式是application/x-www-form-urlencoded
  2. url: 值是一个字符串,默认没有设置。可以让你使用http或者https进行put或者post。
  3. http_method: 值可以是put或者post,默认没有设置。
  4. format: 值可以是json、form、message三种类型,默认是json。用来设置http body的格式,如果是form格式,http body会被影射成一个查询字符串(foo=bar&baz=fizz…)

这样,在node.js端可以用request方式接受到form表单的内容,然后在进行分析。