Журналы перезаписываются в указанном индексе под тем же _id - PullRequest
0 голосов
/ 22 мая 2019

Я использую filebeat - 6.5.1, Logstash - 6.5.1 иasticsearch - 6.5.1

Я использую несколько GROK в одном файле конфигурации и пытаюсь отправить журналы в Elasticsearch

Ниже мой Filebeat.yml

filebeat.prospectors:

type: log
paths:

var/log/message
fields:
type: apache_access
tags: ["ApacheAccessLogs"]
type: log
paths:

var/log/indicate
fields:
type: apache_error
tags: ["ApacheErrorLogs"]
type: log
paths:

var/log/panda
fields:
type: mysql_error
tags: ["MysqlErrorLogs"]
output.logstash:
The Logstash hosts
hosts: ["logstash:5044"]

Ниже мой файл конфигурации logstash -

input {
beats {
port => 5044
tags => [ "ApacheAccessLogs", "ApacheErrorLogs", "MysqlErrorLogs" ]
}
}
filter {
if "ApacheAccessLogs" in [tags] {
grok {
match => [
"message" , "%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}",
"message" , "%{COMMONAPACHELOG}+%{GREEDYDATA:extra_fields}"
]
overwrite => [ "message" ]
}
mutate {
convert => ["response", "integer"]
convert => ["bytes", "integer"]
convert => ["responsetime", "float"]
}
geoip {
source => "clientip"
target => "geoip"
add_tag => [ "apache-geoip" ]
}
date {
match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
remove_field => [ "timestamp" ]
}
useragent {
source => "agent"
}
}
if "ApacheErrorLogs" in [tags] {
grok {
match => { "message" => ["[%{APACHE_TIME:[apache2][error][timestamp]}] [%{LOGLEVEL:[apache2][error][level]}]( [client %{IPORHOST:[apache2][error][client]}])? %{GREEDYDATA:[apache2][error][message]}",
"[%{APACHE_TIME:[apache2][error][timestamp]}] [%{DATA:[apache2][error][module]}:%{LOGLEVEL:[apache2][error][level]}] [pid %{NUMBER:[apache2][error][pid]}(:tid %{NUMBER:[apache2][error][tid]})?]( [client %{IPORHOST:[apache2][error][client]}])? %{GREEDYDATA:[apache2][error][message1]}" ] }
pattern_definitions => {
"APACHE_TIME" => "%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}"
}
remove_field => "message"
}
mutate {
rename => { "[apache2][error][message1]" => "[apache2][error][message]" }
}
date {
match => [ "[apache2][error][timestamp]", "EEE MMM dd H:m:s YYYY", "EEE MMM dd H:m:s.SSSSSS YYYY" ]
remove_field => "[apache2][error][timestamp]"
}
}
if "MysqlErrorLogs" in [tags] {
grok {
match => { "message" => ["%{LOCALDATETIME:[mysql][error][timestamp]} ([%{DATA:[mysql][error][level]}] )?%{GREEDYDATA:[mysql][error][message]}",
"%{TIMESTAMP_ISO8601:[mysql][error][timestamp]} %{NUMBER:[mysql][error][thread_id]} [%{DATA:[mysql][error][level]}] %{GREEDYDATA:[mysql][error][message1]}",
"%{GREEDYDATA:[mysql][error][message2]}"] }
pattern_definitions => {
"LOCALDATETIME" => "[0-9]+ %{TIME}"
}
remove_field => "message"
}
mutate {
rename => { "[mysql][error][message1]" => "[mysql][error][message]" }
}
mutate {
rename => { "[mysql][error][message2]" => "[mysql][error][message]" }
}
date {
match => [ "[mysql][error][timestamp]", "ISO8601", "YYMMdd H:m:s" ]
remove_field => "[apache2][access][time]"
}
}
}

output {
if "ApacheAccessLogs" in [tags] {
elasticsearch { hosts => ["elasticsearch:9200"]
index => "apache"
document_id => "apacheaccess"
}
}
if "ApacheErrorLogs" in [tags] {
elasticsearch { hosts => ["elasticsearch:9200"]
index => "apache"
document_id => "apacheerror"
}
}
if "MysqlErrorLogs" in [tags] {
elasticsearch { hosts => ["elasticsearch:9200"]
index => "apache"
document_id => "sqlerror"
}
}
stdout { codec => rubydebug }
}

Данные отправленыдля упругого поиска, но для каждого document_id в одном и том же индексе создаются только 3 записи.

Создаются только 3 записи, и все новые входящие журналы перезаписываются на один и тот же document_id, а старая теряется.

Не могли бы вы, ребята, помочь мне?

1 Ответ

2 голосов
/ 22 мая 2019

Определение document_id - предоставить уникальный идентификатор документа для события.В вашем случае, поскольку они являются статическими ( apacheaccess , apacheerror , sqlerror ), будет только 1 событие на индекс, введенное вasticsearch, overide новейшей версиейevent.

Поскольку у вас есть 3 различных типа данных, то, что вы, похоже, ищете, обеспечивает для каждого типа события (ApacheAccessLogs, ApacheErrorLogs, MysqlErrorLogs) свой индекс, как показано ниже:

output {
  if "ApacheAccessLogs" in [tags] {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "apache-access"
    }
  }
  if "ApacheErrorLogs" in [tags] {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "apache-error"
    }
  }
  if "MysqlErrorLogs" in [tags] {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "mysql-error"
    }
  }
  stdout {
    codec => rubydebug
  }
}

Существует не так много случаев, когда вам нужно установить идентификатор вручную (например, в случае повторной загрузки данных), так как Logstash & Elasticsearch самостоятельно справится с этим.

Но если это так, то вы можетеЕсли вы не используете поле для индивидуальной идентификации каждого события, вы можете использовать logstash-filter-fingerprint , предназначенный для этого.

...