Kafka Подключиться, чтобы сохранить тему в индексе Elasticsearch, используя поле из сообщения (json) - PullRequest
0 голосов
/ 07 февраля 2019

Я пытаюсь индексировать сообщения в Elasticsearch, используя SMT только из Kafka Connect API.

До сих пор мне повезло с простым использованием функциональности маршрутизатора темы и метки времени.Однако теперь я хотел бы создать отдельные индексы на основе определенного поля в сообщении.

Предположим, что сообщения отформатированы следующим образом:

{"productId": 1, "category": "boat", "price": 135000}
{"productId": 1, "category": "helicopter", "price": 300000}
{"productId": 1, "category": "car", "price": 25000}

Можно ли как-то индексироватьэто к следующим индексам, основанным на категории продукта?

  • product-boat
  • product-helicopter
  • product-car

илиДолжен ли я создавать отдельные темы для каждой отдельной категории (зная, что их может стать сотни или тысячи)?

Наблюдаю ли я за преобразованием, которое может это сделать, или это просто невозможно, и будет ли пользовательский компонентдолжны быть построены?

Ответы [ 2 ]

0 голосов
/ 08 февраля 2019

Если вы используете Confluent Platform, вы можете выполнить какую-либо маршрутизацию в зависимости от значения поля в сообщении.

Для этого вам нужно использовать ExtractTopic SMT из Confluent.Более подробную информацию об этом SMT можно найти по адресу https://docs.confluent.io/current/connect/transforms/extracttopic.html#extracttopic

Kafka Sink Connector обрабатывает сообщения, которые представлены SinkRecord.Каждый SinkRecord содержит несколько полей: topic, partition, value, key и т. Д. Эти поля устанавливаются Kafka Connect, и с помощью преобразования вы можете изменить это значение.ExtractTopic SMT изменяет значение topic на основе value или key сообщения.

Конфигурация преобразований будет выглядеть примерно так:

{
...
    "transforms": "ExtractTopic",
    "transforms.ExtractTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
    "transforms.ExtractTopic.field": "name",  <-- name of field, that value will be used as index name
...
}

Oneограничение заключается в том, что вы должны создавать индексы заранее.

Как я предполагаю, что вы используете Elasticsearch Sink Connector .Соединитель Elasticsearch имеет возможность создавать индекс, но он делает это, когда его открывает - метод для создания устройств записи для определенного раздела (ElasticsearchSinkTask::open).В вашем случае в данный момент все индексы не могут быть созданы, потому что значения всех сообщений недоступны.

Возможно, это не самый чистый подход, потому что ExtractTopic лучше использовать для соединителей источника, но в вашем случае это может сработать.

0 голосов
/ 07 февраля 2019

Нет ничего из коробки с Kafka Connect, которая сделает это.У вас есть несколько вариантов:

  1. Соединитель приемника Elasticsearch будет маршрутизировать сообщения в целевой индекс на основе его темы, чтобы вы могли написать собственный SMT, который будет проверять сообщение и направлять его в другую тему.соответственно
  2. Используйте потоковый процессор для предварительной обработки сообщений, чтобы они уже были на разные темы к тому времени, когда они потребляются соединителем приемника Elasticsearch.Например, Kafka Streams или KSQL.
    • KSQL, вам нужно было бы жестко кодировать каждую категорию (CREATE STREAM product-boat AS SELECT * FROM messages WHERE category='boat' и т. Д.)
    • Kafka Streams теперь имеет динамическую маршрутизацию ( KIP-303 ), которая была бы более гибкойспособ сделать это
  3. Ручной код изготовленного на заказ соединителя приемника Elasticsearch с кодированной логикой для маршрутизации сообщений в индексы на основе содержимого сообщения.Это похоже на худший из трех подходов ИМО.
...