Используйте значение поля JSON в качестве ключа записи для Kafka topi c с filebeat - PullRequest
0 голосов
/ 20 февраля 2020

У меня есть событие (JSON сообщение) в файле, которое нужно отправить в Kafka через filebeat . Сообщение JSON выглядит следующим образом:

{"time":1582213700.001,"interval":"2s","worker":11,"application":"1.1.1.1"}

Я хотел бы отправить это сообщение Кафке. Ключ разделения должен быть полем приложения в сообщении о событии JSON. Как я могу предоставить поле пользовательского приложения в сообщении JSON в качестве ключа разделения для записи Kafka?

Файл filebeat.yml выглядит так:

…
        output.kafka:
          version: 0.10.1
          hosts: ["{KAFKA1}:9092", "{KAFKA2}:9092", "{KAFKA3}:9092"]
          topic: '%{[log_topic]}'
          codec.format:
            string: '%{[message]}'
          key: '%{[message.application]:default}'
          partition.hash:
            hash: []
            random: true # if false non-hashable events will be dropped
          required_acks: 1
          compression: none

https://www.elastic.co/guide/en/beats/libbeat/6.8/config-file-format-type.html#_format_string_sprintf Согласно этой ссылке мы можем ссылаться на значения поля события, используя строку формата spe c.

При такой конфигурации сообщение по умолчанию «default» всегда сообщается как ключ. Как настроить filebeat.yml для извлечения настраиваемого поля приложения и использования этой информации в качестве ключа разделения Kafka?

Кроме того, я попытался определить поле в разделе ввода следующим образом:

type: log
  enabled: true
  paths:
  - /var/logs/*event.log
  fields:
    log_topic: "event"
    application: '%{[application]} string'
  fields_under_root: true

и соответствующий вывод kafka в виде:

output.kafka:
  version: 0.10.1
  hosts: ["{KAFKA1}:9092", "{KAFKA2}:9092", "{KAFKA3}:9092"]
  topic: '%{[log_topic]}'
  codec.format:
    string: '%{[message]}'
  key: '%{[application]:default}'

  partition.hash:
    hash: []
    random: true # if false non-hashable events will be dropped
  required_acks: 1

Но тогда ключ разделения kafka всегда был:% {[application]} string

1 Ответ

0 голосов
/ 21 февраля 2020

Предполагая, что JSON фактически проанализирован, я думаю, что вы хотите

key: '%{[fields.application]:default}'

см. Пример - https://www.elastic.co/guide/en/beats/filebeat/6.8/kafka-output.html#topic -option-kafka

Вы также можете быть заинтересованным в добавлении метаданных хоста - https://www.elastic.co/guide/en/beats/filebeat/6.8/add-host-metadata.html

и расшифровке JSON - https://www.elastic.co/guide/en/beats/filebeat/6.8/decode-json-fields.html

...