Я запускаю MirrorMaker2 с драйвером высокого уровня, как описано здесь , при этом ./bin/connect-mirror-maker.sh mm2.properties
выполняется в 3 модулях в развертывании k8s.
Файл mm2.properties выглядит следующим образом :
clusters = source, dest
source.bootstrap.servers = ***:9092
dest.bootstrap.servers = ***:9092
source->dest.enabled = true
dest->source.enabled = false
source->dest.topics = event\.PROD\.some_id.*
replication.factor=3
checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3
sync.topic.acls.enabled = false
Это работает нормально, реплицируются все темы, соответствующие регулярному выражению event\.PROD\.some_id.*
. Теперь, когда мне нужно добавить другие topi c в белый список, я ожидал, что смогу просто уменьшить масштаб, обновить регулярное выражение и снова масштабировать все.
Когда я обновляю регулярное выражение белого списка до source->dest.topics = event\.PROD\.(some_id|another_id).*
, темы, соответствующие "another_id"
, создаются в кластере dest, но данные не реплицируются, и mirrormaker, кажется, теряется при выполнении смещений:
[2020-05-28 20:33:19,496] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:424)
[2020-05-28 20:33:19,496] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:441)
[2020-05-28 20:33:19,499] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets successfully in 3 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:523)
Это ограничение драйвера высокого уровня, или я что-то не так делаю? Насколько я понимаю, возможность динамически добавлять темы в белый список была одной из мотиваций для MM2.