Свободный маршрут на основе типа полезной нагрузки - PullRequest
0 голосов
/ 05 марта 2019

Я новичок в Fluentd.Я использую kubernetes daemonset fluentd для сбора логов из док-контейнеров и отправки их в kafka.У меня есть еще одна служба kubernetes, которая принимает сообщения от kafka и отправляет их вasticsearh, а затем kibana.Я хочу, чтобы поле log записи Fluentd было разделено на отдельные поля для дальнейшего использования в поисковых запросах kibana.Например:

исходная запись:

"log" : "{\"foo\" : \"bar\"}"

Вывод:

"foo" : "bar"
"log" : "{\"foo\" : \"bar\"}"

Я придумал следующую конфигурацию:

<source>
@type kafka
brokers "#{ENV['FLUENT_KAFKA_BROKERS']}"
topics "#{ENV['FLUENT_KAFKA_TOPICS']}"
</source>

<filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type parser
key_name log
reserve_data true
 <parse>
   @type json
 </parse>
</filter>

<match "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type elasticsearch
host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
logstash_format true
</match>

Нок сожалению, не все журналы имеют формат json, в результате чего парсерам json не удается разобрать простой текст: ParserError error="pattern not match with data

Возможно ли применить парсер json, только если поле log является допустимым объектом json?Если это обычный текст, я бы хотел, чтобы его отправили как есть.

1 Ответ

0 голосов
/ 06 марта 2019

Нашел эту библиотеку https://github.com/ninadpage/fluent-plugin-parser-maybejson/

она не работает с последним fluentd, создаст PR для решения этой проблемы, достаточно добавить: "require 'fluent / parser'"

ОБНОВЛЕНИЕ: после обновления версия плагина fluentd не работает, как описано в README.Я нашел еще один, который работает с fluentd 1.4.0

fluent-plugin-multi-format-parser

, и в итоге имел следующую конфигурацию:

<filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
  @type parser
  key_name log
  reserve_data true
  <parse>
    @type multi_format
    <pattern>
      format json
    </pattern>
    <pattern>
      format none
    </pattern>
  </parse>
</filter>

<filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
  @type record_transformer
  remove_keys message  # remove message from non jsom logs
</filter>
...