Как исправить «Фарадей :: ConnectionFailed» в logstash - PullRequest
0 голосов
/ 27 марта 2019

Я использую систему Windows локально для проверки конфигурации стека ELK.ОС - Windows 10 x64 Java -

java version "1.8.0_161"
Java(TM) SE Runtime Environment (build 1.8.0_161-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)

ES - Версия: 6.4.0LS - версия: 6.4.2fb - версия: 6.4.2

Я использую шаблон X_T- для распознавания файлов по filebeat, для передачи данных в logstash для анализа и внедрения в ES.

Мои данные JSON выглядят ниже для двух разных дат: [03/01/2019]

[
{"ReadDate":"2019-03-01", "ReadTime":"09:52:40", "idUser": 1, "currentData": 2},
{"ReadDate":"2019-03-01", "ReadTime":"09:52:40", "idUser": 2, "currentData": 1},
{"ReadDate":"2019-03-01", "ReadTime":"09:52:40", "idUser": 3, "currentData": 0},
{"ReadDate":"2019-03-01", "ReadTime":"09:52:40", "idUser": 4, "currentData": 3},
{"ReadDate":"2019-03-01", "ReadTime":"09:52:40", "idUser": 5, "currentData": 4}
]

[03/02/2019]

[
{"ReadDate":"2019-03-02", "ReadTime":"09:52:40", "idUser": 1, "currentData": 3},
{"ReadDate":"2019-03-02", "ReadTime":"09:52:40", "idUser": 2, "currentData": 2},
{"ReadDate":"2019-03-02", "ReadTime":"09:52:40", "idUser": 3, "currentData": 1},
{"ReadDate":"2019-03-02", "ReadTime":"09:52:40", "idUser": 4, "currentData": 4},
{"ReadDate":"2019-03-02", "ReadTime":"09:52:40", "idUser": 5, "currentData": 5}
]

Я пытаюсь добиться того, чтобы при обнаружении новой записи Logstash должен был запросить старую запись (старый экземпляр) и использовать старые currentData, которые будут вычтены из current currentData, и поместить их как usedData.Моя конфигурация в Logstash Conf приведена ниже:

input {
  beats {
    port => 9600
  }
}

filter {
  if ([fields][log_type] == "wmeter_expon_test") {
    elasticsearch {
       hosts => ["http://127.0.0.1:9200"]
       index => "wmeter_expon_test"
       query_template => "template_test.json"
        fields => { 
                "currentData" => "PreviousExpon"
                "@timestamp" => "lastTimeStamp"
            }
    }
    ruby {
        code => 'event.set("UsedExpon", ((event.get("currentData").to_i)-(event.get("PreviousExpon").to_i)))'
    }  
    if [WMExpon] == "FF" {
        mutate {
          replace => {
            "[type]" => "wmeter_expon_test"
            "currentData" => "-1"
          }
          add_field => { 
            "Status" => 0
          }
          strip => ["ReadTime"]
        }
    } else {  
        mutate {
          replace => {
            "[type]" => "wmeter_expon_test"
          }
          add_field => { 
            "Status" => 0
          }
          strip => ["ReadTime"]
        }
    }
    fingerprint {
        source => ["ReadDate", "ReadTime", "idUser"]
        target => "[@metadata][fingerprint]"
        method => "MURMUR3"
        concatenate_sources => true
    }
  }
}

output {
    if [type] == "wmeter_weight" {
        elasticsearch {  
            hosts => ["http://127.0.0.1:9200"]
            index => "wmeter_info"
            document_type => "_doc"
            action => "create"
            document_id => "%{[@metadata][fingerprint1]}"
        }
    }
    elasticsearch {
      hosts => ["http://127.0.0.1:9200"]
      index => "%{type}"
      document_type => "_doc"
      document_id => "%{[@metadata][fingerprint]}"
    }
    if [type] == "wmeter_expon" {
        elasticsearch {  
            hosts => ["http://127.0.0.1:9200"]
            index => "wmeter_weight"
            document_type => "_doc"
            action => "create"
            document_id => "%{[@metadata][fingerprint1]}"
        }
    }
    stdout { codec => rubydebug }
}

И template_test.json выглядит ниже

{
    "query": {
            "query_string": {
                "query": "idUser:%{[idUser]} AND (currentData:>='0' OR currentData:>=0)"
            }
        },
    "size": 1, 
    "sort": [{"@timestamp": "desc"}]
}
  • Что происходитКогда появляется новая запись, она не может по какой-то причине запросить ES получить предыдущий экземпляр, чтобы выполнить вычитание, и, следовательно, продолжает принимать 0 как предыдущие currentData.В результате все используемые данные отображаются так же, как currentData.
  • Logstash Ошибка журнала печатается ниже
Failed to query elasticsearch for previous event {:index=>"wmeter_expon_test", :query=>{"query"=>{"query_string"=>{"query"=>"idUser:1 AND (currentData:>='0' OR currentData:>=0)"}}, "size"=>1, "sort"=>[{"@timestamp"=>"desc"}]}, :event=>#<LogStash::Event:0x76830963>, :error=>#<Faraday::ConnectionFailed>}
[2019-03-27T15:08:01,820][WARN ][logstash.filters.elasticsearch] Failed to query elasticsearch for previous event {:index=>"wmeter_expon_test", :query=>{"query"=>{"query_string"=>{"query"=>"idUser:%{[idUser]} AND (currentData:>='0' OR currentData:>=0)"}}, "size"=>1, "sort"=>[{"@timestamp"=>"desc"}]}, :event=>#<LogStash::Event:0x328e7a3a>, :error=>#<Faraday::ConnectionFailed>}
[2019-03-27T15:08:01,816][WARN ][logstash.filters.elasticsearch] Failed to query elasticsearch for previous event {:index=>"wmeter_expon_test", :query=>{"query"=>{"query_string"=>{"query"=>"idUser:2 AND (currentData:>='0' OR currentData:>=0)"}}, "size"=>1, "sort"=>[{"@timestamp"=>"desc"}]}, :event=>#<LogStash::Event:0x5b0f6efd>, :error=>#<Faraday::ConnectionFailed>}
[2019-03-27T15:08:02,102][WARN ][logstash.filters.elasticsearch] Failed to query elasticsearch for previous event {:index=>"wmeter_expon_test", :query=>{"query"=>{"query_string"=>{"query"=>"idUser:3 AND (currentData:>='0' OR currentData:>=0)"}}, "size"=>1, "sort"=>[{"@timestamp"=>"desc"}]}, :event=>#<LogStash::Event:0x66847583>, :error=>#<Faraday::ConnectionFailed>}
[2019-03-27T15:08:02,215][WARN ][logstash.filters.elasticsearch] Failed to query elasticsearch for previous event {:index=>"wmeter_expon_test", :query=>{"query"=>{"query_string"=>{"query"=>"idUser:4 AND (currentData:>='0' OR currentData:>=0)"}}, "size"=>1, "sort"=>[{"@timestamp"=>"desc"}]}, :event=>#<LogStash::Event:0xa639f6e>, :error=>#<Faraday::ConnectionFailed>}
[2019-03-27T15:08:02,287][WARN ][logstash.filters.elasticsearch] Failed to query elasticsearch for previous event {:index=>"wmeter_expon_test", :query=>{"query"=>{"query_string"=>{"query"=>"idUser:5 AND (currentData:>='0' OR currentData:>=0)"}}, "size"=>1, "sort"=>[{"@timestamp"=>"desc"}]}, :event=>#<LogStash::Event:0x7d9ed706>, :error=>#<Faraday::ConnectionFailed>}

Может кто-нибудь помочь, пожалуйста?

  • Уже проверили, что ES работает на 127.0.0.1:9200 правильно
  • ДанныеИнжекции работают должным образом, за исключением того факта, что они принимают неверные данные для usedData

  • По сути, Logstash должен запросить предыдущий экземпляр, извлечь старую переменную currentData из текущего currentData и поместить ее в поле usedData.

  • Раньше это работало некоторое время назад, но недавно прекратилось.Больше не работает в производстве и / или локально.
  • Ссылка https://www.elastic.co/guide/en/logstash/current/plugins-filters-elasticsearch.html
...