Я использую последнюю версию logsta sh (7.6.2). Я пытаюсь объединить строки из двух разных файлов , используя общий идентификатор. Найдите ниже образцы из двух источников данных. Также найдите ниже желаемый результат.
Пример данных 1 со столбцами ID, Страна, Штат
111 US NY
112 IN KA
113 US MA
Пример данных 2 со столбцами ID и информация
111 abc
111 abd
112 xyz
112 xya
113 qwe
113 qwr
Желаемый результат со слиянием
111 abc US NY
111 abd US NY
112 xyz IN KA
112 xya IN KA
113 qwe US MA
113 qwr US MA
Я пробовал использовать агрегат, как показано в приведенном ниже файле конфигурации. Я также установил конвейерные рабочие на 1 и отключил выполнение java, добавив следующие команды в logsta sh .yml
pipeline.workers: 1
pipeline.java_execution: false
Файл конфигурации
input {
file{
path => ["C:/Users/Nani/Sample_Data/Mapping.txt"]
path => ["C:/Users/Nani/Sample_Data/sample.log"]
start_position => "beginning"
sincedb_path => "NUL"
}
}
filter{
grok {
match => ["message","%{INT:ID},%{WORD:COUNTRY},%{GREEDYDATA:CITY},%{GREEDYDATA:STATE},%{GREEDYDATA:Location}\r"]
match => ["message", "%{TIMESTAMP_ISO8601:timestamp}\s\s%{LOGLEVEL:LOGLEVEL1}\s%{WORD:Info}:%{INT:ID}\s\-\s%{GREEDYDATA:LOGLEVEL2}\r"]
remove_field => ["message"]
}
if "_grokparsefailure" in [tags] {drop{}}
mutate{
convert => { "ID" => "string" }
}
if [Country] =~ /.+/ {
aggregate {
task_id => "%{ID}"
code => "
map['country'] = event.get('Country')
map['state'] = event.get('State')
event.cancel()
"
}
drop{}
}
if ![Country] {
aggregate {
task_id => "%{ID}"
code => "
event.set('Country', map['country'])
event.set('State', map['state'])
event.cancel()
"
}
}
}
Мне удалось получить результаты слияния для образца, но когда я пробую его на всех данных, только несколько строк обновляются с объединенными столбцами вместо всех строк.
Мне здесь что-то не хватает? Любая помощь здесь приветствуется. TIA