Api процессора Kafka Streams позволяет использовать топологию с несколькими кластерами? - PullRequest
0 голосов
/ 18 июня 2020

Использование API процессора kafka (не DSL) для чтения из источника topi c и записи в целевой topi c он отлично работает для настройки одного кластера kafka (то есть, если и исходная, и целевая темы находятся в одном кластер), но когда исходная и целевая темы находятся в разных кластерах kafka, я получаю исключение NullPointerException для контекста целевого процессора

Topology
topology.addSource("mySource", "SourceTopic");
topology.addProcessor("SourceStreamProcessor",()->new SourceStreamProcessor(), "mySource");         

topology.addProcessor("TargetProcessor",()->new TargetProcessor(), "Target");
topology.addSink("sink1","OUTPUT_TOPIC1","TargetProcessor");
topology.addSink("sink2","OUTPUT_TOPIC2","TargetProcessor");


Properties sourceProcessorProps = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SourceStreamProcessor"); // Kafka Cluster 1
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dev_Cluser_xx.org:9092");

Properties targetProcessorProps = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "targetStreamProcessor"); // Kafka Cluster 2
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test_Cluser_xx.org:9092");

Как мы можем использовать API процессора потоков kafka для записи из одной топи c в одну кластер на другой топи c в другом кластере?

1 Ответ

1 голос
/ 18 июня 2020

Потоки Kafka не поддерживают чтение из одного кластера Kafka и запись в другой кластер Kafka.

Вы можете обрабатывать сообщения в одном кластере, а затем использовать Mirror Maker copy это другому.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...