728x90
반응형

workflow

 : input(로그 입력) → filter(로그 가공) → output (로그 출력)

 : Filebeat를 통해 input을 하며, Logstash에서 가공하고, Elasticsearch에서 받고 Kibana로 출력하는 흐름

 

수집로그

  : 수집하는 로그는 다음과 같다

  : Nginx(Access, Error), OS(auth, message), MongoDB (Slowquery)

 

logstash 폴더 구조

➜  logstash ls -ltr
total 526032
-rw-r--r--  1 park_hyungkyu  admin       2019  1 29 12:26 jvm.options
-rw-r--r--  1 park_hyungkyu  admin       9097  1 29 12:26 log4j2.properties
-rw-r--r--  1 park_hyungkyu  admin        342  1 29 12:26 logstash-sample.conf
-rw-r--r--  1 park_hyungkyu  admin       1696  1 29 12:26 startup.options
-rw-r--r--  1 park_hyungkyu  admin  267069440  2  1 10:25 core.alpha.tar
-rw-r--r--  1 park_hyungkyu  admin       1763  2  1 18:06 logstash.conf_fail
-rw-r--r--  1 park_hyungkyu  admin       2494  2  2 11:14 logstash.conf.org
-rw-r--r--  1 park_hyungkyu  admin       3697  2  2 11:34 pipelines.yml.org
-rw-r--r--  1 park_hyungkyu  admin      11250  2  2 12:49 logstash.yml
-rw-r--r--  1 park_hyungkyu  admin        259  2  2 12:54 pipelines.yml
-rw-------  1 root           admin      86198  2  2 12:55 nohup.out
drwxr-xr-x  5 park_hyungkyu  admin        160  2  2 15:08 conf.d

- 기본적으로 logstash 명령어 실행 시 아무 옵션 주지 않았을 때, pipelines.yml을 읽는다.

- logstash.yml 파일은 pipeline에 대한 공통된 튜닝 옵션을 제공한다. pipelines.yml에서 별도 옵션을 주지 않으면 기본적으로 logstash.yml에 등록된 옵션을 사용한다.

- conf.d 폴더는 임의로 생성하였으며, 각 로그수집의 설정값을 저장한다.

 

pipelins.yml

➜  logstash cat pipelines.yml
- pipeline.id: aplogs
  path.config: "/etc/logstash/conf.d/aplogs.conf"

 

logstash.yml

➜  logstash cat logstash.yml
path.data: /var/lib/logstash
pipeline.workers: 2
pipeline.batch.size: 125
pipeline.batch.delay: 50
pipeline.unsafe_shutdown: false
pipeline.ordered: auto
config.reload.automatic: true
config.reload.interval: 60s
http.enabled: true
http.port: 9600
dead_letter_queue.enable: false
 
dead_letter_queue.max_bytes: 1024mb
 
 
path.logs: /var/log/logstash

 

Logstash 의 config 파일

 

conf.d/nginx.conf

input {
  beats {
    port => 5044
    host => "0.0.0.0"
    include_codec_tag => false
  }
}
filter {
    if "access" in [tags] {
      grok {
        match => { "message" => ["%{IPORHOST:[nginx][access][remote_addr]}, %{IPORHOST:[nginx][access][lb_addr]} - %{USERNAME:[nginx][access][remote_user]} \[%{HTTPDATE:[nginx][access][time_local]} T:%{DATA:[nginx][access][request_time]}\] \"%{WORD:[nginx][access][request]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{INT:[nginx][access][status]} %{NUMBER:[nginx][access][body_bytes_sent]} \"%{DATA:[nginx][access][http_referer]}\" \"%{DATA:[nginx][access][http_accept_encoding]}\" \"%{DATA:[nginx][access][sent_http_content_encoding]}\" \"%{DATA:[nginx][access][http_user_agent]}\" \"%{DATA:[nginx][access][sent_http_content_type]}\" \"%{DATA:[nginx][access][http_ca_http_header]}\""] }
        remove_field => "message"
      }
      mutate {
        add_field => { "read_timestamp" => "%{@timestamp}" }
        convert => {
         "[nginx][access][http][version]" => "float"
         "[nginx][access][user_agent][magor]" => "integer"
         "[nginx][access][body_bytes_sent]" => "integer"
         "[nginx][access][user_agent][minor]" => "integer"
         "[nginx][access][request_time]" => "float"
        }
      }
      date {
        match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
        remove_field => "[nginx][access][time]"
      }
      useragent {
        source => "[nginx][access][http_user_agent]"
        target => "[nginx][access][user_agent]"
        remove_field => "[nginx][access][http_user_agent]"
      }
      geoip {
        source => "[nginx][access][remote_addr]"
        #target => "[nginx][access][geoip]"
      }
      mutate {
        remove_field => "[geoip][timezone]"
        remove_field => "[geoip][contry_code2]"
        remove_field => "[geoip][contry_code3]"
        remove_field => "[geoip][contry_name]"
        remove_field => "[geoip][continent_code]"
      }
      if "_grokparsefailure" in [tags] {
           drop { }
       }
    }
    else if "error" in [tags] {
      grok {
        #match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] }
        match => { "message" => ["(?<[nginx][error][time_local]>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) \[%{LOGLEVEL:[nginx][error][severity]}\] %{POSINT:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )%{GREEDYDATA:[nginx][error][message]}(?:, client: (?<[nginx][error][clientip]>%{IP}|%{HOSTNAME}))(?:, server: %{IPORHOST:[nginx][error][server]}?)(?:, request: %{QS:[nginx][error][request]})?(?:, upstream: (?<[nginx][error][upstream]>\"%{URI}\"|%{QS}))?(?:, host: %{QS:[nginx][error][request_host]})?(?:, referrer: \"%{URI:[nginx][error][referrer]}\")?"] }
        remove_field => "message"
      }
      mutate {
        rename => { "@timestamp" => "read_timestamp" }
        convert => {
          "[nginx][error][pid]" => "integer"
          "[nginx][error][tid]" => "integer"
          "[nginx][error][connection_id]" => "integer"
 
        }
      }
      #date {
      #  match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ]
      #  remove_field => "[nginx][error][time]"
      #}
    }
    else if "app_log" in [tags] {
        mutate {
          gsub => [ "message", "\\n\\s\\s\\s\\s\\s\\s\\s\\s|\\n\\s\\s", "\\s" ]
       }
        grok {
          match => { "message" => ["(?<timestamp>^.{23})\s(?<class>[^ ]*)\s(?<loglevel>.[^.]*)\s(?<location>.[^ ]*)\s-\s(?<description>(.|\n^.{23})+)"] }
          remove_field => "message"
       }
        if "_grokparsefailure" in [tags] {
          drop { }
       }
    }
    else if "auth" in [tags] {
      grok {
        match => { "message" => ["%{SYSLOGTIMESTAMP:system.auth.timestamp.} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?"] }
        pattern_definitions => {
          "GREEDYMULTILINE"=> "(.|\n)*"
        }
      }
      date {
        match => [ "time_local", "UNIX" ]
      }
      geoip {
        source => "[system][auth][ssh][ip]"
        #target => "[system][auth][ssh][geoip]"
      }
    }
    else if "syslog" in [tags] {
      grok {
        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
        pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
        remove_field => "message"
      }
      date {
        match => [ "time_local", "UNIX" ]
      }
    }
    else if "mongo" in [tags] {
      grok {
        match => { "message" => ["(?<timestamp>^.{23})Z\s(?<serverity>[^ ]*)\s(?<component>[^ ]+)\s\s(?<context>[^ ]+)\s(?<process>[^ ]+)\s(?<DB>[^ ]+)\scommand:\s(?<query>[^ ]+)\s(?<total>[^ ].+)\s(?<planSummary>planSummary.+)protocol:op_msg\s(?<op_msg>.+)ms"] }
        remove_field => "message"
        }
      if "_grokparsefailure" in [tags] {
           drop { }
       }
      mutate {
        #remove_field => "total"
        convert => {
         "op_msg" => "integer"
        }
      }
    }
    else if "digdag_log" in [tags] {
      grok {
        match => { "message" => ["(?<path>[^ ]+):(?<errMsg>[^*]+)"] }
        remove_field => "message"
        }
      if "_grokparsefailure" in [tags] {
           drop { }
       }
    }
}
 
output {
  if "access" in [tags] {
    elasticsearch {
        hosts => ["test:9200"]
        manage_template => false
        index => "access-%{+yyyy.MM.dd}"
        ecs_compatibility => disabled
        }
    }
   else if "error" in [tags] {
    elasticsearch {
        hosts => ["test:9200"]
        manage_template => false
        index => "error-%{+yyyy.MM.dd}"
        ecs_compatibility => disabled
        }
    }
    else if "app_log" in [tags] {
    elasticsearch {
        hosts => ["test:9200"]
        manage_template => false
        index => "app_log-%{+yyyy.MM.dd}"
        ecs_compatibility => disabled
        }
    }
   else if "auth" in [tags] {
    elasticsearch {
        hosts => ["test:9200"]
        manage_template => false
        ecs_compatibility => disabled
        index => "auth-%{+yyyy.MM.dd}"
        }
    }
   else if "syslog" in [tags] {
    elasticsearch {
        hosts => ["test:9200"]
        manage_template => false
        index => "syslog-%{+yyyy.MM.dd}"
        ecs_compatibility => disabled
        }
    }
   else if "mongo" in [tags] {
    elasticsearch {
        hosts => ["test:9200"]
        manage_template => false
        index => "mongo-%{+yyyy.MM.dd}"
        ecs_compatibility => disabled
        }
    }
   else if "digdag_log" in [tags] {
    elasticsearch {
        hosts => ["test:9200"]
        manage_template => false
        index => "digdag-%{+yyyy.MM.dd}"
        ecs_compatibility => disabled
        }
    }
}

Nginx Access Log와 Error Log, System log, MongoDb SlowQuery, DigDag Log 에 대해 filter grok pattern/regex 사용하여 파싱

 

filebeat 설치

  • filebeat.yml 파일로 filebeat를 설정하고, modules.d에서 모듈 관리 각 로그에 대한 모듈를 저장/관리
-- 다운로드 및 설치
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.9.2-x86_64.rpm
rpm -ivh filebeat-7.9.2-x86_64.rpm
 
-- 설치 확인
rpm -qa | grep filebeat
filebeat-7.9.2-1.x86_64

 

filebeat 설정

# cd /etc/filebeat
 
[root@abcd filebeat]# ls
fields.yml  filebeat.reference.yml  filebeat.yml  modules.d
 
 
# cat filebeat.yml
 
filebeat.inputs:
 - type: log
   enabled: true
   paths:
     - /tmp/test.log
   tags: ["app_log"]
   symlinks: true
   exclude_lines: ["DEBUG|INFO|TRACE|WARN"]
 
 - type: log
   enabled: true
   paths:
     - /var/log/nginx/access.log
   tags: ["access"]
   exclude_lines: [".*l7check.*"]
 
 - type: log
   enabled: true
   paths:
     - /var/log/nginx/error.log
   tags: ["error"]
 
 - type: log
   enabled: true
   paths:
     - /var/log/message*
   tags: ["syslog"]
 
 - type: log
   enabled: true
   paths:
     - /var/log/secure*
   tags: ["auth"]
 
 - type: log
   enabled: true
   paths:
     - /data/mongodb/router/log/mongos.log
   tags: ["mongo"]
 
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enable: true
 
setup.template.settings:
  index.number_of_shards: 1
 
setup.kibana:
  host: "test:5601"
 
output.logstash:
  hosts: ["test:5044"]
 
#output.elasticsearch:
#  hosts: ["test-es:9200"]
 
processors:
#  - add_host_metadata: ~
#  - add_cloud_metadata: ~
#  - add_docker_metadata: ~
#  - add_kubernetes_metadata: ~
   - drop_fields:
       fields: ["agent.ephemeral_id", "agent.hostname", "agent.id", "agent.type", "agent.version", "ecs.version", "input.type", "log.offset", "version"]
 
logging.level: info
#logging.level: debug
 
 
 
# filebeat 모듈은 선택에 따라 사용하는 것으로 함
 
# filebeat 실행
nohup filebeat -e -c /etc/filebeat/filebeat.yml &

 

728x90
300x250

+ Recent posts