Как скопировать какое-нибудь сообщение из одной топики c в kafka из bash? - PullRequest
1 голос
/ 02 апреля 2020

Просьба помочь

У нас есть 2 kafka topi c. Я хочу скопировать 10 сообщений из начала темы 1 в тему 2.

Я пытаюсь сделать это с kafka-console-consumer и kafka-console-серийный производитель

Сначала я сохраняю 10 сообщений из topic1 в некоторый каталог:

for (( i=1; i<=10; i++ )); do bin/kafka-console-consumer.sh --bootstrap-server 1.1.2.3:9092 --group CONSUMER1 --topic TOPIC1 --max-messages 1 > /tmp/_topic/$i.msg; done;

, затем я пытаюсь с kafka-console-продюсер отправить его в topic2:

for (( i=1; i<=10; i++ )); do  bin/kafka-console-producer.sh --broker-list 1.1.2.4:9092 --topic TOPIC2 < /tmp/_topic/$i.msg; done;

И я получил ошибка - мой сервис не может десериализовать данные. Мой вопрос:

  1. сработает ли мое решение?
  2. Почему я могу получить эту ошибку?
  3. Каков наилучший способ скопировать сообщение из одного topi c в другой один раз?

UPD: Как я` Я решаю эту проблему (спасибо: Робин Моффатт): Я использую kafka-mirror и этот jar: https://github.com/opencore/mirrormaker_topic_rename, с этим я могу скопировать сообщение из одной topi c kafka в другую на одном кластере

1 Ответ

2 голосов
/ 02 апреля 2020

Вы можете сделать это с помощью kafkacat:

kafkacat -b localhost:9092 -C -t source-topic -K: -e -o beginning -c10 | \
kafkacat -b localhost:9092 -P -t target-topic -K: 
  • | перенаправляет вывод первого kafkacat (который является потребителем -C) в вход второго kafkacat (который является -P продюсер)
  • -c10 означает просто потреблять 10 сообщений
  • -o beginning означает начало в начале topi c.

Обратите внимание, что это не сработает, если у вас есть двоичные данные (например, Avro). Чтобы правильно сделать это, используйте что-то вроде Replicator или MirrorMaker2 и ByteArrayConverter.

Ссылка: https://rmoff.net/2019/09/29/copying-data-between-kafka-clusters-with-kafkacat/

...