Автоматическое погружение создаваемых тем в kafka вasticsearch - PullRequest
0 голосов
/ 02 октября 2018

У меня есть темы, создаваемые в kafka (test1, test2, test3), и я хочу сделать их более гибкими во время создания.Я пробовал themes.regex, но он создает индексы только для уже существующих тем.Как я могу вставить новую тему в индекс, когда он создается динамически?

Вот конфигурация соединителя, которую я использую для kafka-sink:

{
    "name": "elastic-sink-test-regex",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics.regex": "test[0-9]+",
        "type.name": "kafka-connect",
        "connection.url": "http://192.168.0.188:9200",
        "key.ignore": "true",
        "schema.ignore": "true",
        "schema.enable": "false",
        "batch.size": "100",
        "flush.timeout.ms": "100000",
        "max.buffered.records": "10000",
        "max.retries": "10",
        "retry.backoff.ms": "1000",
        "max.in.flight.requests": "3",
        "is.timebased.indexed": "False",
        "time.index": "at"
    }
}

1 Ответ

0 голосов
/ 12 апреля 2019

Соединитель приемника не будет читать новые темы, пока этот соединитель не будет перезапущен (или произошел запланированный перебаланс).Вы можете запустить Kafka Stream, который читает сообщения из новых тем и помещает их в тему, похожую на результат.Sink Connector выполняет чтение из темы, похожей на результат.

Чтобы сохранить соответствие «сообщение - тема», вы можете использовать Kafka Record Headers.

Убедитесь, что оно соответствует вашим требованиям!

...