Если вы используете Confluent Platform
, вы можете выполнить какую-либо маршрутизацию в зависимости от значения поля в сообщении.
Для этого вам нужно использовать ExtractTopic
SMT из Confluent.Более подробную информацию об этом SMT можно найти по адресу https://docs.confluent.io/current/connect/transforms/extracttopic.html#extracttopic
Kafka Sink Connector обрабатывает сообщения, которые представлены SinkRecord
.Каждый SinkRecord
содержит несколько полей: topic
, partition
, value
, key
и т. Д. Эти поля устанавливаются Kafka Connect, и с помощью преобразования вы можете изменить это значение.ExtractTopic
SMT изменяет значение topic
на основе value
или key
сообщения.
Конфигурация преобразований будет выглядеть примерно так:
{
...
"transforms": "ExtractTopic",
"transforms.ExtractTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.ExtractTopic.field": "name", <-- name of field, that value will be used as index name
...
}
Oneограничение заключается в том, что вы должны создавать индексы заранее.
Как я предполагаю, что вы используете Elasticsearch Sink Connector .Соединитель Elasticsearch имеет возможность создавать индекс, но он делает это, когда его открывает - метод для создания устройств записи для определенного раздела (ElasticsearchSinkTask::open
).В вашем случае в данный момент все индексы не могут быть созданы, потому что значения всех сообщений недоступны.
Возможно, это не самый чистый подход, потому что ExtractTopic
лучше использовать для соединителей источника, но в вашем случае это может сработать.