И заранее благодарю за любую помощь!
Я использую Netdata для сбора метрик с серверов, а затем отправляю их в Logstash и Elastic.
Мне нужно объединить метрики с одинаковыми полями и создать одно событие, но во вложенном формате.
Это пример ввода из Netdata:
{"host":"centosdns","@version":"1","port":52212,"@timestamp":"2019-01-19T16:16:22.117Z","message":"netdata.centosdns.disk_await.centos_swap.reads 0.0000000 1547914548"}
{"host":"centosdns","@version":"1","port":52212,"@timestamp":"2019-01-19T16:16:22.117Z","message":"netdata.centosdns.disk_await.centos_swap.writes 0.0000000 1547914548"}
{"host":"centosdns","@version":"1","port":52212,"@timestamp":"2019-01-19T16:16:22.117Z","message":"netdata.centosdns.disk_await.centos_root.reads 0.0000000 1547914548"}
{"host":"centosdns","@version":"1","port":52212,"@timestamp":"2019-01-19T16:16:22.117Z","message":"netdata.centosdns.disk_await.centos_root.writes 0.0000000 1547914548"}
Моя конфигурацияфайл logstash выглядит следующим образом:
input {
tcp {
port => 1234
}
}
filter {
# I take 'message' field and separate in different fields
grok {
named_captures_only => "true"
pattern_definitions => {
"CHART" => "[a-z]\w+"
"FAMILY" => "[_a-z0-9]+"
}
match => {
"message" => "%{WORD:prefix}\.%{WORD:hostname}\.%{CHART:chart}\.%{FAMILY:family}\.%{NOTSPACE:dimension} %{NUMBER:val} %{NUMBER:timestamp}"
}
}
if "_grokparsefailure" not in [tags] {
mutate {
remove_field => [ "@version", "host", "port", "prefix" ]
}
# Attempt to create a nested field and then aggregate
mutate {
id => "chart_field"
add_field => { "[%{chart}][%{family}][%{dimension}][value]" => "%{val}"
}
}
aggregate {
task_id => "[%{chart}][%{family}]"
code => "
# I tried many codes to aggregate but without success
event.cancel()
"
push_previous_map_as_event => true
timeout => 5
}
mutate {
# Remove unnecessary fields
id => "netdata_mutate_remove"
remove_field => [ "timestamp", "message"]
}
} else {
drop{}
}
output {
# TESTING PURPOSES
if "_aggregateexception" in [tags] {
file {
path => "/var/log/logstash/netdata/aggregatefailures-%{+MM-dd}.log"
}
} else {
file {
path => "/var/log/logstash/netdata/netdata-%{+MM-dd}-aggregate.log"
}
}
stdout { codec => rubydebug }
}
Возьмите приведенные выше данные:
"netdata.centosdns.disk_await.centos_swap.reads 0.0000000"
"netdata.centosdns.disk_await.centos_swap.writes 0.0000000"
Моя цель - создать вложенное поле, например:
disk_await: { # Chart
centos_swap: { # Family
[
reads => 0.0000000, # Dimension => Value
writes => 0.0000000 # Dimension => Value
]
}
}
Я претендую наагрегировать все «Измерение \« Значение »» в одну и ту же «Диаграмму» \ «Семейство», это только четыре строки метрик, но в действительности мы говорим о 1000 в секунду или даже больше, в некоторых случаях все метрики являются динамическими, это виртуальныеневозможно узнать все имена.
В данный момент я использую:
Logstash v.6.5.4 on a Virtualbox CentOS 7 minimal
All plugins (inputs/filters/outputs) updated