Kafka-connect добавляет больше тем на лету - PullRequest
1 голос
/ 22 марта 2019

У меня есть asticsearch kafka-connect соединитель , использующий некоторые темы.
Со следующей конфигурацией:

{
    connection.url": "https://my-es-cluster:443",
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.ignore": "true",
    "topics": "topic1,topic2",
    ...
}

Могу ли я добавить дополнительные темы вэто во время работы?
Что будет?
Что если я удалю некоторые темы из списка и добавлю их снова позже.

Я хотел бы добавить новый topic3 здесь:

{
    ...
    "topics": "topic1,topic2,topic3",
    ...
}

Что я удаляю topic2?Будут ли использоваться другие темы?:

{
    ...
    "topics": "topic1,topic3",
    ...
}

1 Ответ

1 голос
/ 22 марта 2019

Поскольку у вас уже запущены kafka и kafka-connect, вы можете использовать REST API для kafka-connect и проверить это самостоятельно: https://docs.confluent.io/current/connect/references/restapi.html

Если вы добавите новую тему (topic3), все сообщения в этой теме (в соответствии с политикой хранения) будут использованы.

PUT http://kafka-connect:8083/connectors/my-test-connector/config
{
   ...
   "topics": "topic1,topic2,topic3",
   ...
}

Проверьте состояние и настройку этого соединителя:

GET http://kafka-connect:8083/connectors/my-test-connector

Если вы хотите отключить некоторыетему, просто используйте PUT для обновления конфигурации для этого соединителя.

PUT http://kafka-connect:8083/connectors/my-test-connector/config
{
   ...
   "topics": "topic1,topic3",
   ...
}

Ничего не изменится для topic1 и topic3.Просто topic2 больше не будет потребляться.
Но если вы захотите вернуть его обратно, сообщения с topic2 будут использованы с последнего зафиксированного смещения, а не с начала.

Для каждого consumer group последнего зафиксированного offset сохраняется, не имеет значения, что вы на какое-то время удалили тему из конфигурации.В этом случае группа потребителей будет connect-my-test-connector.

Даже если вы удалите соединитель (DELETE http://kafka-connect:8083/connectors/my-test-connector), а затем создадите его снова с тем же именем, смещение будет сохранено, а потребление будетпродолжил с того момента, когда вы удалили его.(соблюдайте правила хранения, обычно это 7 дней).

...