У меня есть очень простой скрипт logstash, сохраняющий файл, содержащий несколько сообщений XML, вasticsearch.Например,
<message>
<timestamp></timestamp>
<payload>
<Fruit>
<Apple>
...
</Apple>
</Fruit>
</payload>
</message>
<message>
<timestamp></timestamp>
<payload>
<Fruit>
<Pear>
...
</Pear>
</Fruit>
</payload>
</message>
<message>
<timestamp></timestamp>
<payload>
<Fruit>
<Banana>
...
</Banana>
<Orange>
...
</Orange>
</Fruit>
</payload>
</message>
Это приводит к следующему в эластичном поиске:
{
"_source": {
"@timestamp":
"message": {
"payload": {
"Fruit": {
"Apple": {
...
}
}
}
}
}
},
{
"_source": {
"@timestamp":
"message": {
"payload": {
"Fruit": {
"Pear": {
...
}
}
}
}
}
},
{
"_source": {
"@timestamp":
"message": {
"payload": {
"Fruit": [
{
"Banana": {
...
}
},
{
"Orange": {
...
}
}
]
}
}
}
}
То, что я сейчас хотел бы сделать, это получить метрику всех различных типов сообщений с течением времени.Более конкретно, подсчет различных видов фруктов с течением времени.Я понимаю, как агрегировать значения полей, а не имена полей, как это.Обратите внимание, что это очень упрощенный пример, и у меня более 50 различных сообщений, которые могут измениться, поэтому фильтр типа if / else в logstash, создающий поле fruit_type
, не будет идеальным для целей обслуживания.
ОБНОВЛЕНИЕ
Мне удалось что-то сделать с рубиновым фильтром в logstash, но я все равно хотел бы знать, является ли это лучшим / оптимальным способом достижения того, чего я хочу:
ruby {
code => '
payload_type = []
event.get("[messages][payload][Fruit]").each { |k|
payload_type << "messages.payload.Fruit." + k[0]
}
event.set("[payload_type]", payload_type)
'
}