Кафка MirrorMaker 2.0 дублирует каждое сообщение - PullRequest
1 голос
/ 31 января 2020

Я пытаюсь скопировать кластер Kafka с помощью MirrorMaker 2.0. Я использую следующие mm2.properties:

name = mirror-site1-site2
topics = .*
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1
plugin.path=/usr/share/java/kafka/plugin
clusters = site1, site2

# for demo, source and target clusters are the same
source.cluster.alias = site1
target.cluster.alias = site2

site1.sasl.mechanism=SCRAM-SHA-256
site1.security.protocol=SASL_PLAINTEXT
site1.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
   username="<someuser>" \
   password="<somepass>";

site2.sasl.mechanism=SCRAM-SHA-256
site2.security.protocol=SASL_PLAINTEXT
site2.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
   username="<someuser>" \
   password="<somepass>";

site1.bootstrap.servers = <IP1>:9093, <IP2>:9093, <IP3>:9093, <IP4>:9093
site2.bootstrap.servers = <IP5>:9093, <IP6>:9093, <IP7>:9093, <IP8>:9093

site1->site2.enabled = true
site1->site2.topics = topic1


# use ByteArrayConverter to ensure that records are not re-encoded
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter

Итак, вот в чем проблема: mm2, кажется, всегда копирует сообщения x3:

# Manual message production: 

 kafkacat -P -b <IP1>:9093,<IP2>:9093,<IP3>:9093,<IP4>:9093 -t "topic1"


# Result in the source topic (site1 cluster): 

% Reached end of topic topic1 [2] at offset 405
Message1
% Reached end of topic topic1 [2] at offset 406
Message2
% Reached end of topic topic1 [6] at offset 408
Message3
% Reached end of topic topic1 [2] at offset 407

 kafkacat -P -b <IP5>:9093,<IP6>:9093,<IP7>:9093,<IP8>:9093 -t "site1.topic1"

# Result in the target topic (site2 cluster): 

% Reached end of topic site1.titi [2] at offset 1216
Message1
Message1
Message1
% Reached end of topic site1.titi [2] at offset 1219
Message2
Message2
Message2
% Reached end of topic site1.titi [6] at offset 1229
Message3
Message3
Message3

Я пытался использовать Kafka из confluent package и kafka_2.13 -2.4.0 напрямую с Apache, оба с Debian 10.1.

Сначала я поддержал это поведение с помощью конфлюента 5.4, подумал, что это может быть ошибка в их пакете, так как у них есть репликатор, и он не должен заботиться о mm2, но я воспроизвел точно такую ​​же проблему с kafka_2.13-2.4. 0 непосредственно от Apache без изменений.

Мне известно, что мм2 еще не идемпотентен и не может гарантировать однократную доставку. В моих тестах (я пробовал много вещей, включая настройку производителя или большую партию тысяч сообщений). Во всех этих тестах mm2 всегда дублирует X3 всех сообщений.

Я что-то пропустил, кто-то поощрял то же самое? Как заметка сайта с устаревшим mm1 с теми же пакетами, у меня нет этой проблемы.

Благодарю за любую помощь ... Спасибо!


Даже если журнал изменений не вселил в меня уверенность в улучшении, я снова попытался запустить mm2, на этот раз из kafka 2.4.1 , => Без изменений всегда эти странные дубликаты.

Я установил эту версию на новом сервере, чтобы убедиться, что странное поведение, с которым я столкнулся, не связано с сервером.

Когда я использую ACL, мне нужно специальное право? Я помещаю «все», думая, что это не может быть более всепоглощающим ... Даже если mm2 не идемпотентный, да, я попытаюсь связать это с правом.

Это удивляет меня тем больше, что я не могу найти ничего, сообщающего о проблеме, подобной этой, наверняка я должен сделать что-то не так, но что это за вопрос ...

1 Ответ

1 голос
/ 14 апреля 2020

Включение идемпотентности в конфигурацию клиента решит проблему. По умолчанию будет установлено значение false. Добавьте приведенное ниже в файл mm2.properties

source.cluster.producer.enable.idempotence = true
target.cluster.producer.enable.idempotence = true
...