Elasticsearch: дубликаты, вызванные перезаписью файлов журнала - PullRequest
0 голосов
/ 26 февраля 2020

Я использую стек ELK. Файлы журнала сохраняются каждые 5 минут с помощью простого приложения Java. Затем Filebeat бросает их в Logsta sh. Из-за перезаписи одинаковые сообщения индексируются (их отпечатки идентичны). Единственным отличием является идентификатор документа. Elasticsearch дает новый идентификатор для документов каждый раз, когда они перезаписываются. Как избавиться от дубликатов или сохранить идентификатор документа таким же?

Logsta sh ввод:

    input {
  beats {
    port => 5044
    ssl => false
    ssl_certificate => "/etc/pki/tls/certs/logstash-beats.crt"
    client_inactivity_timeout => 200
    ssl_key => "/etc/pki/tls/private/logstash-beats.key"
  }
}
filter {
        if [fields][log_type] == "access" {
        grok {
        match => [ "message", "%{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:apache_timestamp}\] \"%{WORD:method} /%{WORD:servername}/%{NOTSPACE:requestpage} HTTP/%{NUMBER:http_version}\" %{NUMBER:server_response} %{NUMBER:answer_size}" ]
    }
    }
    else  if [fields][log_type] == "errors" {
      grok {
        match => {"message" => "%{DATESTAMP:maximotime}(.*)SystemErr"}
          }
          date {
        timezone => "Europe/Moscow"
        match => ["maximotime", "dd.MM.yy HH:mm:ss:SSS"]
       }
      mutate {
         copy => { "message" => "key" }
      }
      mutate {
        gsub => [
          "message", ".*SystemErr     R ", "",
          "key", ".*SystemErr     R", ""
        ]
      }
      truncate {
        fields => "key"
        length_bytes => 255
      }
      fingerprint {
        method => "SHA1"
        source => ["key"]
      }
      if "_grokparsefailure" in [tags] {
        drop { }
      }
    } else  if [fields][log_type] == "info" {
    grok {
      match => {"message" => ["%{TIMESTAMP_ISO8601:maximotime}.* ПОЛЬЗОВАТЕЛЬ = \(%{WORD:username}.*программа \(%{WORD:appname}\).*объект \(%{WORD:object}\).*: %{GREEDYDATA:sql}  \(выполнение заняло %{NUMBER:execution} миллисекунд\) \{conditions:%{GREEDYDATA:conditions}\}",  "%{TIMESTAMP_ISO8601:maximotime}.* ПОЛЬЗОВАТЕЛЬ = \(%{WORD:username}.*программа \(%{WORD:appname}\).*объект \(%{WORD:object}\).*: %{GREEDYDATA:sql} \{conditions:%{GREEDYDATA:conditions}\}", "%{TIMESTAMP_ISO8601:maximotime}.* ПОЛЬЗОВАТЕЛЬ = \(%{WORD:username}.*программа \(%{WORD:appname}\).*объект \(%{WORD:object}\).*: %{GREEDYDATA:sql}  \(выполнение заняло %{NUMBER:execution} миллисекунд\)"]}
      add_field => {
        "type" => "conditions"
      }
    }
      mutate {
       convert => {
        "execution" => "integer"
       }
      }
      fingerprint {
        method => "SHA1"
        source => ["message"]
      }
    if "_grokparsefailure" in [tags] {
        grok {
        match => {"message" => "%{TIMESTAMP_ISO8601:maximotime} (.*)getMboCount %{WORD:object}: mbosets \(%{WORD:mbosets}\), mbos \(%{WORD:mbos}\)"}
        add_field => {
        "type" => "maximoObjectCount"
        }
        remove_tag => ["_grokparsefailure"]
        }
        mutate {
        convert => {
         "mbosets" => "integer"
         "mbos" => "integer"
        }
        }
        fingerprint {
          method => "SHA1"
          source => ["message"]
        }
        if "_grokparsefailure" in [tags] {
          drop { }
        }
    }

    date {
        timezone => "Europe/Moscow"
        match => ["maximotime", "yyyy-MM-dd HH:mm:ss:SSS"]
        target => "maximotime"
       }
    }
}

Logsta sh вывод:

output {
    stdout {codec => rubydebug}
    if [fields][log_type] == "access" {
        elasticsearch {  
            hosts => ["localhost"]
            manage_template => false
            index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
            document_type => "%{[@metadata][type]}"
        }
    } else if [fields][log_type] == "errors"{
        elasticsearch {  
            hosts => ["localhost"]
            manage_template => false
            index => "%{[@metadata][beat]}-error-%{+YYYY.MM.dd}"
            document_type => "%{[@metadata][type]}"
        }
    } else if [fields][log_type] == "info"{
        elasticsearch {  
            hosts => ["localhost"]
            manage_template => false
            index => "%{[@metadata][beat]}-info-%{+YYYY.MM.dd}"
            document_type => "%{[@metadata][type]}"
            document_id => "%{fingerprint}"
        }
    }
}

Filebeat.yml:

filebeat.config:
  modules:
    path: ${path.config}/modules.d/*.yml
    reload.enabled: false

processors:
- add_cloud_metadata: ~

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/integration/*.log
  fields:  {log_type: access}
- type: log
  enabled: true
  paths:
    - /var/log/maximo_error_logs/*.log
  fields:  {log_type: errors}
  exclude_lines: '^((\*+)|Log file started at:)'
  multiline.pattern: '(^$|(\t|\s)at .*|.*Caused by:.*|.*SystemErr( ){5}R[ \t]{2}at .*|^ru.ocrv..*|^(\s|\t|)null.*|Обратитесь за.*|.*Закрытое со.*|^(\s|\t|)(ORA-.*|BMX.*)|^(\\s|\t)[А-Яа-я].*)|(.*\d more$)'
  multiline.negate: false
  multiline.match: after
- type: log
  enabled: true
  paths:
    - /var/log/maximo_logs/*.log
  fields:  {log_type: info}

output.logstash:
  hosts: ["elk:5044"]
  bulk_max_size: 200

1 Ответ

0 голосов
/ 28 февраля 2020

Я тупой. Я перезапускал контейнер Filebeat вместо ELK, поэтому мои настройки Logsta sh не применялись ... Теперь он работает, и моя конфигурация вывода Logsta sh выглядит следующим образом:

document_id => "%{type}-%{fingerprint}"
action => "create"
...