Я пытаюсь скопировать кластер Kafka с помощью MirrorMaker 2.0. Я использую следующие mm2.properties:
name = mirror-site1-site2
topics = .*
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1
plugin.path=/usr/share/java/kafka/plugin
clusters = site1, site2
# for demo, source and target clusters are the same
source.cluster.alias = site1
target.cluster.alias = site2
site1.sasl.mechanism=SCRAM-SHA-256
site1.security.protocol=SASL_PLAINTEXT
site1.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="<someuser>" \
password="<somepass>";
site2.sasl.mechanism=SCRAM-SHA-256
site2.security.protocol=SASL_PLAINTEXT
site2.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="<someuser>" \
password="<somepass>";
site1.bootstrap.servers = <IP1>:9093, <IP2>:9093, <IP3>:9093, <IP4>:9093
site2.bootstrap.servers = <IP5>:9093, <IP6>:9093, <IP7>:9093, <IP8>:9093
site1->site2.enabled = true
site1->site2.topics = topic1
# use ByteArrayConverter to ensure that records are not re-encoded
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter
Итак, вот в чем проблема: mm2, кажется, всегда копирует сообщения x3:
# Manual message production:
kafkacat -P -b <IP1>:9093,<IP2>:9093,<IP3>:9093,<IP4>:9093 -t "topic1"
# Result in the source topic (site1 cluster):
% Reached end of topic topic1 [2] at offset 405
Message1
% Reached end of topic topic1 [2] at offset 406
Message2
% Reached end of topic topic1 [6] at offset 408
Message3
% Reached end of topic topic1 [2] at offset 407
kafkacat -P -b <IP5>:9093,<IP6>:9093,<IP7>:9093,<IP8>:9093 -t "site1.topic1"
# Result in the target topic (site2 cluster):
% Reached end of topic site1.titi [2] at offset 1216
Message1
Message1
Message1
% Reached end of topic site1.titi [2] at offset 1219
Message2
Message2
Message2
% Reached end of topic site1.titi [6] at offset 1229
Message3
Message3
Message3
Я пытался использовать Kafka из confluent package и kafka_2.13 -2.4.0 напрямую с Apache, оба с Debian 10.1.
Сначала я поддержал это поведение с помощью конфлюента 5.4, подумал, что это может быть ошибка в их пакете, так как у них есть репликатор, и он не должен заботиться о mm2, но я воспроизвел точно такую же проблему с kafka_2.13-2.4. 0 непосредственно от Apache без изменений.
Мне известно, что мм2 еще не идемпотентен и не может гарантировать однократную доставку. В моих тестах (я пробовал много вещей, включая настройку производителя или большую партию тысяч сообщений). Во всех этих тестах mm2 всегда дублирует X3 всех сообщений.
Я что-то пропустил, кто-то поощрял то же самое? Как заметка сайта с устаревшим mm1 с теми же пакетами, у меня нет этой проблемы.
Благодарю за любую помощь ... Спасибо!
Даже если журнал изменений не вселил в меня уверенность в улучшении, я снова попытался запустить mm2, на этот раз из kafka 2.4.1 , => Без изменений всегда эти странные дубликаты.
Я установил эту версию на новом сервере, чтобы убедиться, что странное поведение, с которым я столкнулся, не связано с сервером.
Когда я использую ACL, мне нужно специальное право? Я помещаю «все», думая, что это не может быть более всепоглощающим ... Даже если mm2 не идемпотентный, да, я попытаюсь связать это с правом.
Это удивляет меня тем больше, что я не могу найти ничего, сообщающего о проблеме, подобной этой, наверняка я должен сделать что-то не так, но что это за вопрос ...