Вы можете использовать MirrorMaker (версия 1), который поставляется с Kafka. Этот инструмент в основном используется для репликации данных из одного центра обработки данных в другой. Он основан на предположении, что имена topi c остаются одинаковыми в обоих кластерах.
Однако вы можете указать свой собственный MessageHandler
, который переименовывает topi c.
package org.xxx.java;
import java.util.Collections;
import java.util.List;
import kafka.consumer.BaseConsumerRecord;
import kafka.tools.MirrorMaker;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* An example implementation of MirrorMakerMessageHandler that allows to rename topic.
*/
public class TopicRenameHandler implements MirrorMaker.MirrorMakerMessageHandler {
private final String newName;
public TopicRenameHandler(String newName) {
this.newName = newName;
}
public List<ProducerRecord<byte[], byte[]>> handle(BaseConsumerRecord record) {
return Collections.singletonList(new ProducerRecord<byte[], byte[]>(newName, record.partition(), record.key(), record.value()));
}
}
Я использовал следующие зависимости в моем pom.xml
файле
<properties>
<kafka.version>2.5.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
Скомпилируйте приведенный выше код и обязательно добавьте свой класс в CLASSPATH
export CLASSPATH=$CLASSPATH:/.../target/MirrorMakerRenameTopics-1.0.jar
Теперь, вместе с некоторыми базами c consumer.properties
bootstrap.servers=localhost:9092
client.id=mirror-maker-consumer
group.id=mirror-maker-rename-topic
auto.offset.reset=earliest
и producer.properties
bootstrap.servers=localhost:9092
client.id=mirror-maker-producer
вы можете позвонить kafka-mirror-maker
, как показано ниже
kafka-mirror-maker --consumer.config /path/to/consumer.properties \
--producer.config /path/to/producer.properties \
--num.streams 1 \
--whitelist="topicToBeRenamed" \
--message.handler org.xxx.java.TopicRenameHandler \
--message.handler.args "newTopicName"
Обратите внимание на следующие два предостережения при использовании этого подхода:
- Поскольку вы планируете изменить количество разделов, порядок сообщений в новом topi c может отличаться от старого topi c. По умолчанию сообщения в Kafka разделяются с помощью ключа.
- Использование MirrorMaker не скопирует ваши существующие смещения в старом topi c, а начнет записывать новые смещения. Таким образом, не будет (почти) никакой связи между смещениями от старого topi c к смещениям от нового topi c.