Я использую систему Windows локально для проверки конфигурации стека ELK.ОС - Windows 10 x64 Java -
java version "1.8.0_161"
Java(TM) SE Runtime Environment (build 1.8.0_161-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)
ES - Версия: 6.4.0LS - версия: 6.4.2fb - версия: 6.4.2
Я использую шаблон X_T- для распознавания файлов по filebeat, для передачи данных в logstash для анализа и внедрения в ES.
Мои данные JSON выглядят ниже для двух разных дат: [03/01/2019]
[
{"ReadDate":"2019-03-01", "ReadTime":"09:52:40", "idUser": 1, "currentData": 2},
{"ReadDate":"2019-03-01", "ReadTime":"09:52:40", "idUser": 2, "currentData": 1},
{"ReadDate":"2019-03-01", "ReadTime":"09:52:40", "idUser": 3, "currentData": 0},
{"ReadDate":"2019-03-01", "ReadTime":"09:52:40", "idUser": 4, "currentData": 3},
{"ReadDate":"2019-03-01", "ReadTime":"09:52:40", "idUser": 5, "currentData": 4}
]
[03/02/2019]
[
{"ReadDate":"2019-03-02", "ReadTime":"09:52:40", "idUser": 1, "currentData": 3},
{"ReadDate":"2019-03-02", "ReadTime":"09:52:40", "idUser": 2, "currentData": 2},
{"ReadDate":"2019-03-02", "ReadTime":"09:52:40", "idUser": 3, "currentData": 1},
{"ReadDate":"2019-03-02", "ReadTime":"09:52:40", "idUser": 4, "currentData": 4},
{"ReadDate":"2019-03-02", "ReadTime":"09:52:40", "idUser": 5, "currentData": 5}
]
Я пытаюсь добиться того, чтобы при обнаружении новой записи Logstash должен был запросить старую запись (старый экземпляр) и использовать старые currentData, которые будут вычтены из current currentData, и поместить их как usedData.Моя конфигурация в Logstash Conf приведена ниже:
input {
beats {
port => 9600
}
}
filter {
if ([fields][log_type] == "wmeter_expon_test") {
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "wmeter_expon_test"
query_template => "template_test.json"
fields => {
"currentData" => "PreviousExpon"
"@timestamp" => "lastTimeStamp"
}
}
ruby {
code => 'event.set("UsedExpon", ((event.get("currentData").to_i)-(event.get("PreviousExpon").to_i)))'
}
if [WMExpon] == "FF" {
mutate {
replace => {
"[type]" => "wmeter_expon_test"
"currentData" => "-1"
}
add_field => {
"Status" => 0
}
strip => ["ReadTime"]
}
} else {
mutate {
replace => {
"[type]" => "wmeter_expon_test"
}
add_field => {
"Status" => 0
}
strip => ["ReadTime"]
}
}
fingerprint {
source => ["ReadDate", "ReadTime", "idUser"]
target => "[@metadata][fingerprint]"
method => "MURMUR3"
concatenate_sources => true
}
}
}
output {
if [type] == "wmeter_weight" {
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "wmeter_info"
document_type => "_doc"
action => "create"
document_id => "%{[@metadata][fingerprint1]}"
}
}
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "%{type}"
document_type => "_doc"
document_id => "%{[@metadata][fingerprint]}"
}
if [type] == "wmeter_expon" {
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "wmeter_weight"
document_type => "_doc"
action => "create"
document_id => "%{[@metadata][fingerprint1]}"
}
}
stdout { codec => rubydebug }
}
И template_test.json выглядит ниже
{
"query": {
"query_string": {
"query": "idUser:%{[idUser]} AND (currentData:>='0' OR currentData:>=0)"
}
},
"size": 1,
"sort": [{"@timestamp": "desc"}]
}
- Что происходитКогда появляется новая запись, она не может по какой-то причине запросить ES получить предыдущий экземпляр, чтобы выполнить вычитание, и, следовательно, продолжает принимать 0 как предыдущие currentData.В результате все используемые данные отображаются так же, как currentData.
- Logstash Ошибка журнала печатается ниже
Failed to query elasticsearch for previous event {:index=>"wmeter_expon_test", :query=>{"query"=>{"query_string"=>{"query"=>"idUser:1 AND (currentData:>='0' OR currentData:>=0)"}}, "size"=>1, "sort"=>[{"@timestamp"=>"desc"}]}, :event=>#<LogStash::Event:0x76830963>, :error=>#<Faraday::ConnectionFailed>}
[2019-03-27T15:08:01,820][WARN ][logstash.filters.elasticsearch] Failed to query elasticsearch for previous event {:index=>"wmeter_expon_test", :query=>{"query"=>{"query_string"=>{"query"=>"idUser:%{[idUser]} AND (currentData:>='0' OR currentData:>=0)"}}, "size"=>1, "sort"=>[{"@timestamp"=>"desc"}]}, :event=>#<LogStash::Event:0x328e7a3a>, :error=>#<Faraday::ConnectionFailed>}
[2019-03-27T15:08:01,816][WARN ][logstash.filters.elasticsearch] Failed to query elasticsearch for previous event {:index=>"wmeter_expon_test", :query=>{"query"=>{"query_string"=>{"query"=>"idUser:2 AND (currentData:>='0' OR currentData:>=0)"}}, "size"=>1, "sort"=>[{"@timestamp"=>"desc"}]}, :event=>#<LogStash::Event:0x5b0f6efd>, :error=>#<Faraday::ConnectionFailed>}
[2019-03-27T15:08:02,102][WARN ][logstash.filters.elasticsearch] Failed to query elasticsearch for previous event {:index=>"wmeter_expon_test", :query=>{"query"=>{"query_string"=>{"query"=>"idUser:3 AND (currentData:>='0' OR currentData:>=0)"}}, "size"=>1, "sort"=>[{"@timestamp"=>"desc"}]}, :event=>#<LogStash::Event:0x66847583>, :error=>#<Faraday::ConnectionFailed>}
[2019-03-27T15:08:02,215][WARN ][logstash.filters.elasticsearch] Failed to query elasticsearch for previous event {:index=>"wmeter_expon_test", :query=>{"query"=>{"query_string"=>{"query"=>"idUser:4 AND (currentData:>='0' OR currentData:>=0)"}}, "size"=>1, "sort"=>[{"@timestamp"=>"desc"}]}, :event=>#<LogStash::Event:0xa639f6e>, :error=>#<Faraday::ConnectionFailed>}
[2019-03-27T15:08:02,287][WARN ][logstash.filters.elasticsearch] Failed to query elasticsearch for previous event {:index=>"wmeter_expon_test", :query=>{"query"=>{"query_string"=>{"query"=>"idUser:5 AND (currentData:>='0' OR currentData:>=0)"}}, "size"=>1, "sort"=>[{"@timestamp"=>"desc"}]}, :event=>#<LogStash::Event:0x7d9ed706>, :error=>#<Faraday::ConnectionFailed>}
Может кто-нибудь помочь, пожалуйста?
- Уже проверили, что ES работает на 127.0.0.1:9200 правильно
ДанныеИнжекции работают должным образом, за исключением того факта, что они принимают неверные данные для usedData
По сути, Logstash должен запросить предыдущий экземпляр, извлечь старую переменную currentData из текущего currentData и поместить ее в поле usedData.
- Раньше это работало некоторое время назад, но недавно прекратилось.Больше не работает в производстве и / или локально.
- Ссылка https://www.elastic.co/guide/en/logstash/current/plugins-filters-elasticsearch.html