Как распределить сообщения между темами Kafka с разной конфигурацией? - PullRequest
2 голосов
/ 04 мая 2020

Ищу способ как распределить сообщения между двумя темами Kafka. В оригинальном topi c у меня есть 20 разделов с 1000000 сообщениями на раздел. Я хочу иметь новый topi c с разделами 1000 и распространять сообщения по новому более широкому диапазону разделов.

1T -> 20P -> 1000000 messages per partition (total 20m/topic)
2T -> 1000P -> 20000 messages per partition (total 20m/topic)

Возможно ли это сделать в Kafka (через topi c зеркалирование или другая техника)?

1 Ответ

1 голос
/ 04 мая 2020

Вы можете использовать MirrorMaker (версия 1), который поставляется с Kafka. Этот инструмент в основном используется для репликации данных из одного центра обработки данных в другой. Он основан на предположении, что имена topi c остаются одинаковыми в обоих кластерах.

Однако вы можете указать свой собственный MessageHandler, который переименовывает topi c.

package org.xxx.java;

import java.util.Collections;
import java.util.List;
import kafka.consumer.BaseConsumerRecord;
import kafka.tools.MirrorMaker;
import org.apache.kafka.clients.producer.ProducerRecord;


/**
 * An example implementation of MirrorMakerMessageHandler that allows to rename topic.
 */
public class TopicRenameHandler implements MirrorMaker.MirrorMakerMessageHandler {
  private final String newName;

  public TopicRenameHandler(String newName) {
    this.newName = newName;
  }

  public List<ProducerRecord<byte[], byte[]>> handle(BaseConsumerRecord record) {
    return Collections.singletonList(new ProducerRecord<byte[], byte[]>(newName, record.partition(), record.key(), record.value()));
  }
}

Я использовал следующие зависимости в моем pom.xml файле

    <properties>
        <kafka.version>2.5.0</kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <version>${kafka.version}</version>
        </dependency>
    </dependencies>

Скомпилируйте приведенный выше код и обязательно добавьте свой класс в CLASSPATH

export CLASSPATH=$CLASSPATH:/.../target/MirrorMakerRenameTopics-1.0.jar

Теперь, вместе с некоторыми базами c consumer.properties

bootstrap.servers=localhost:9092
client.id=mirror-maker-consumer
group.id=mirror-maker-rename-topic
auto.offset.reset=earliest

и producer.properties

bootstrap.servers=localhost:9092
client.id=mirror-maker-producer

вы можете позвонить kafka-mirror-maker, как показано ниже

kafka-mirror-maker --consumer.config /path/to/consumer.properties \
 --producer.config /path/to/producer.properties \
 --num.streams 1 \
 --whitelist="topicToBeRenamed" \
 --message.handler org.xxx.java.TopicRenameHandler \
 --message.handler.args "newTopicName"

Обратите внимание на следующие два предостережения при использовании этого подхода:

  • Поскольку вы планируете изменить количество разделов, порядок сообщений в новом topi c может отличаться от старого topi c. По умолчанию сообщения в Kafka разделяются с помощью ключа.
  • Использование MirrorMaker не скопирует ваши существующие смещения в старом topi c, а начнет записывать новые смещения. Таким образом, не будет (почти) никакой связи между смещениями от старого topi c к смещениям от нового topi c.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...