Я новичок в Fluentd.Я использую kubernetes daemonset fluentd для сбора логов из док-контейнеров и отправки их в kafka.У меня есть еще одна служба kubernetes, которая принимает сообщения от kafka и отправляет их вasticsearh, а затем kibana.Я хочу, чтобы поле log
записи Fluentd было разделено на отдельные поля для дальнейшего использования в поисковых запросах kibana.Например:
исходная запись:
"log" : "{\"foo\" : \"bar\"}"
Вывод:
"foo" : "bar"
"log" : "{\"foo\" : \"bar\"}"
Я придумал следующую конфигурацию:
<source>
@type kafka
brokers "#{ENV['FLUENT_KAFKA_BROKERS']}"
topics "#{ENV['FLUENT_KAFKA_TOPICS']}"
</source>
<filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type parser
key_name log
reserve_data true
<parse>
@type json
</parse>
</filter>
<match "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type elasticsearch
host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
logstash_format true
</match>
Нок сожалению, не все журналы имеют формат json, в результате чего парсерам json не удается разобрать простой текст: ParserError error="pattern not match with data
Возможно ли применить парсер json, только если поле log
является допустимым объектом json?Если это обычный текст, я бы хотел, чтобы его отправили как есть.