workflow
: input(로그 입력) → filter(로그 가공) → output (로그 출력)
: Filebeat를 통해 input을 하며, Logstash에서 가공하고, Elasticsearch에서 받고 Kibana로 출력하는 흐름
수집로그
: 수집하는 로그는 다음과 같다
: Nginx(Access, Error), OS(auth, message), MongoDB (Slowquery)
logstash 폴더 구조
➜ logstash ls -ltr
total 526032
-rw-r--r-- 1 park_hyungkyu admin 2019 1 29 12:26 jvm.options
-rw-r--r-- 1 park_hyungkyu admin 9097 1 29 12:26 log4j2.properties
-rw-r--r-- 1 park_hyungkyu admin 342 1 29 12:26 logstash-sample.conf
-rw-r--r-- 1 park_hyungkyu admin 1696 1 29 12:26 startup.options
-rw-r--r-- 1 park_hyungkyu admin 267069440 2 1 10:25 core.alpha.tar
-rw-r--r-- 1 park_hyungkyu admin 1763 2 1 18:06 logstash.conf_fail
-rw-r--r-- 1 park_hyungkyu admin 2494 2 2 11:14 logstash.conf.org
-rw-r--r-- 1 park_hyungkyu admin 3697 2 2 11:34 pipelines.yml.org
-rw-r--r-- 1 park_hyungkyu admin 11250 2 2 12:49 logstash.yml
-rw-r--r-- 1 park_hyungkyu admin 259 2 2 12:54 pipelines.yml
-rw------- 1 root admin 86198 2 2 12:55 nohup.out
drwxr-xr-x 5 park_hyungkyu admin 160 2 2 15:08 conf.d
- 기본적으로 logstash 명령어 실행 시 아무 옵션 주지 않았을 때, pipelines.yml을 읽는다.
- logstash.yml 파일은 pipeline에 대한 공통된 튜닝 옵션을 제공한다. pipelines.yml에서 별도 옵션을 주지 않으면 기본적으로 logstash.yml에 등록된 옵션을 사용한다.
- conf.d 폴더는 임의로 생성하였으며, 각 로그수집의 설정값을 저장한다.
pipelins.yml
➜ logstash cat pipelines.yml
- pipeline.id: aplogs
path.config: "/etc/logstash/conf.d/aplogs.conf"
logstash.yml
➜ logstash cat logstash.yml
path.data: /var/lib/logstash
pipeline.workers: 2
pipeline.batch.size: 125
pipeline.batch.delay: 50
pipeline.unsafe_shutdown: false
pipeline.ordered: auto
config.reload.automatic: true
config.reload.interval: 60s
http.enabled: true
http.port: 9600
dead_letter_queue.enable: false
dead_letter_queue.max_bytes: 1024mb
path.logs: /var/log/logstash
Logstash 의 config 파일
conf.d/nginx.conf
input {
beats {
port => 5044
host => "0.0.0.0"
include_codec_tag => false
}
}
filter {
if "access" in [tags] {
grok {
match => { "message" => ["%{IPORHOST:[nginx][access][remote_addr]}, %{IPORHOST:[nginx][access][lb_addr]} - %{USERNAME:[nginx][access][remote_user]} \[%{HTTPDATE:[nginx][access][time_local]} T:%{DATA:[nginx][access][request_time]}\] \"%{WORD:[nginx][access][request]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{INT:[nginx][access][status]} %{NUMBER:[nginx][access][body_bytes_sent]} \"%{DATA:[nginx][access][http_referer]}\" \"%{DATA:[nginx][access][http_accept_encoding]}\" \"%{DATA:[nginx][access][sent_http_content_encoding]}\" \"%{DATA:[nginx][access][http_user_agent]}\" \"%{DATA:[nginx][access][sent_http_content_type]}\" \"%{DATA:[nginx][access][http_ca_http_header]}\""] }
remove_field => "message"
}
mutate {
add_field => { "read_timestamp" => "%{@timestamp}" }
convert => {
"[nginx][access][http][version]" => "float"
"[nginx][access][user_agent][magor]" => "integer"
"[nginx][access][body_bytes_sent]" => "integer"
"[nginx][access][user_agent][minor]" => "integer"
"[nginx][access][request_time]" => "float"
}
}
date {
match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
remove_field => "[nginx][access][time]"
}
useragent {
source => "[nginx][access][http_user_agent]"
target => "[nginx][access][user_agent]"
remove_field => "[nginx][access][http_user_agent]"
}
geoip {
source => "[nginx][access][remote_addr]"
#target => "[nginx][access][geoip]"
}
mutate {
remove_field => "[geoip][timezone]"
remove_field => "[geoip][contry_code2]"
remove_field => "[geoip][contry_code3]"
remove_field => "[geoip][contry_name]"
remove_field => "[geoip][continent_code]"
}
if "_grokparsefailure" in [tags] {
drop { }
}
}
else if "error" in [tags] {
grok {
#match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] }
match => { "message" => ["(?<[nginx][error][time_local]>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) \[%{LOGLEVEL:[nginx][error][severity]}\] %{POSINT:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )%{GREEDYDATA:[nginx][error][message]}(?:, client: (?<[nginx][error][clientip]>%{IP}|%{HOSTNAME}))(?:, server: %{IPORHOST:[nginx][error][server]}?)(?:, request: %{QS:[nginx][error][request]})?(?:, upstream: (?<[nginx][error][upstream]>\"%{URI}\"|%{QS}))?(?:, host: %{QS:[nginx][error][request_host]})?(?:, referrer: \"%{URI:[nginx][error][referrer]}\")?"] }
remove_field => "message"
}
mutate {
rename => { "@timestamp" => "read_timestamp" }
convert => {
"[nginx][error][pid]" => "integer"
"[nginx][error][tid]" => "integer"
"[nginx][error][connection_id]" => "integer"
}
}
#date {
# match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ]
# remove_field => "[nginx][error][time]"
#}
}
else if "app_log" in [tags] {
mutate {
gsub => [ "message", "\\n\\s\\s\\s\\s\\s\\s\\s\\s|\\n\\s\\s", "\\s" ]
}
grok {
match => { "message" => ["(?<timestamp>^.{23})\s(?<class>[^ ]*)\s(?<loglevel>.[^.]*)\s(?<location>.[^ ]*)\s-\s(?<description>(.|\n^.{23})+)"] }
remove_field => "message"
}
if "_grokparsefailure" in [tags] {
drop { }
}
}
else if "auth" in [tags] {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:system.auth.timestamp.} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?"] }
pattern_definitions => {
"GREEDYMULTILINE"=> "(.|\n)*"
}
}
date {
match => [ "time_local", "UNIX" ]
}
geoip {
source => "[system][auth][ssh][ip]"
#target => "[system][auth][ssh][geoip]"
}
}
else if "syslog" in [tags] {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
remove_field => "message"
}
date {
match => [ "time_local", "UNIX" ]
}
}
else if "mongo" in [tags] {
grok {
match => { "message" => ["(?<timestamp>^.{23})Z\s(?<serverity>[^ ]*)\s(?<component>[^ ]+)\s\s(?<context>[^ ]+)\s(?<process>[^ ]+)\s(?<DB>[^ ]+)\scommand:\s(?<query>[^ ]+)\s(?<total>[^ ].+)\s(?<planSummary>planSummary.+)protocol:op_msg\s(?<op_msg>.+)ms"] }
remove_field => "message"
}
if "_grokparsefailure" in [tags] {
drop { }
}
mutate {
#remove_field => "total"
convert => {
"op_msg" => "integer"
}
}
}
else if "digdag_log" in [tags] {
grok {
match => { "message" => ["(?<path>[^ ]+):(?<errMsg>[^*]+)"] }
remove_field => "message"
}
if "_grokparsefailure" in [tags] {
drop { }
}
}
}
output {
if "access" in [tags] {
elasticsearch {
hosts => ["test:9200"]
manage_template => false
index => "access-%{+yyyy.MM.dd}"
ecs_compatibility => disabled
}
}
else if "error" in [tags] {
elasticsearch {
hosts => ["test:9200"]
manage_template => false
index => "error-%{+yyyy.MM.dd}"
ecs_compatibility => disabled
}
}
else if "app_log" in [tags] {
elasticsearch {
hosts => ["test:9200"]
manage_template => false
index => "app_log-%{+yyyy.MM.dd}"
ecs_compatibility => disabled
}
}
else if "auth" in [tags] {
elasticsearch {
hosts => ["test:9200"]
manage_template => false
ecs_compatibility => disabled
index => "auth-%{+yyyy.MM.dd}"
}
}
else if "syslog" in [tags] {
elasticsearch {
hosts => ["test:9200"]
manage_template => false
index => "syslog-%{+yyyy.MM.dd}"
ecs_compatibility => disabled
}
}
else if "mongo" in [tags] {
elasticsearch {
hosts => ["test:9200"]
manage_template => false
index => "mongo-%{+yyyy.MM.dd}"
ecs_compatibility => disabled
}
}
else if "digdag_log" in [tags] {
elasticsearch {
hosts => ["test:9200"]
manage_template => false
index => "digdag-%{+yyyy.MM.dd}"
ecs_compatibility => disabled
}
}
}
Nginx Access Log와 Error Log, System log, MongoDb SlowQuery, DigDag Log 에 대해 filter grok pattern/regex 사용하여 파싱
filebeat 설치
- filebeat.yml 파일로 filebeat를 설정하고, modules.d에서 모듈 관리 각 로그에 대한 모듈를 저장/관리
-- 다운로드 및 설치
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.9.2-x86_64.rpm
rpm -ivh filebeat-7.9.2-x86_64.rpm
-- 설치 확인
rpm -qa | grep filebeat
filebeat-7.9.2-1.x86_64
filebeat 설정
# cd /etc/filebeat
[root@abcd filebeat]# ls
fields.yml filebeat.reference.yml filebeat.yml modules.d
# cat filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /tmp/test.log
tags: ["app_log"]
symlinks: true
exclude_lines: ["DEBUG|INFO|TRACE|WARN"]
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
tags: ["access"]
exclude_lines: [".*l7check.*"]
- type: log
enabled: true
paths:
- /var/log/nginx/error.log
tags: ["error"]
- type: log
enabled: true
paths:
- /var/log/message*
tags: ["syslog"]
- type: log
enabled: true
paths:
- /var/log/secure*
tags: ["auth"]
- type: log
enabled: true
paths:
- /data/mongodb/router/log/mongos.log
tags: ["mongo"]
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enable: true
setup.template.settings:
index.number_of_shards: 1
setup.kibana:
host: "test:5601"
output.logstash:
hosts: ["test:5044"]
#output.elasticsearch:
# hosts: ["test-es:9200"]
processors:
# - add_host_metadata: ~
# - add_cloud_metadata: ~
# - add_docker_metadata: ~
# - add_kubernetes_metadata: ~
- drop_fields:
fields: ["agent.ephemeral_id", "agent.hostname", "agent.id", "agent.type", "agent.version", "ecs.version", "input.type", "log.offset", "version"]
logging.level: info
#logging.level: debug
# filebeat 모듈은 선택에 따라 사용하는 것으로 함
# filebeat 실행
nohup filebeat -e -c /etc/filebeat/filebeat.yml &