У меня есть событие (JSON сообщение) в файле, которое нужно отправить в Kafka через filebeat . Сообщение JSON выглядит следующим образом:
{"time":1582213700.001,"interval":"2s","worker":11,"application":"1.1.1.1"}
Я хотел бы отправить это сообщение Кафке. Ключ разделения должен быть полем приложения в сообщении о событии JSON. Как я могу предоставить поле пользовательского приложения в сообщении JSON в качестве ключа разделения для записи Kafka?
Файл filebeat.yml выглядит так:
…
output.kafka:
version: 0.10.1
hosts: ["{KAFKA1}:9092", "{KAFKA2}:9092", "{KAFKA3}:9092"]
topic: '%{[log_topic]}'
codec.format:
string: '%{[message]}'
key: '%{[message.application]:default}'
partition.hash:
hash: []
random: true # if false non-hashable events will be dropped
required_acks: 1
compression: none
https://www.elastic.co/guide/en/beats/libbeat/6.8/config-file-format-type.html#_format_string_sprintf Согласно этой ссылке мы можем ссылаться на значения поля события, используя строку формата spe c.
При такой конфигурации сообщение по умолчанию «default» всегда сообщается как ключ. Как настроить filebeat.yml для извлечения настраиваемого поля приложения и использования этой информации в качестве ключа разделения Kafka?
Кроме того, я попытался определить поле в разделе ввода следующим образом:
type: log
enabled: true
paths:
- /var/logs/*event.log
fields:
log_topic: "event"
application: '%{[application]} string'
fields_under_root: true
и соответствующий вывод kafka в виде:
output.kafka:
version: 0.10.1
hosts: ["{KAFKA1}:9092", "{KAFKA2}:9092", "{KAFKA3}:9092"]
topic: '%{[log_topic]}'
codec.format:
string: '%{[message]}'
key: '%{[application]:default}'
partition.hash:
hash: []
random: true # if false non-hashable events will be dropped
required_acks: 1
Но тогда ключ разделения kafka всегда был:% {[application]} string