Использование API процессора kafka (не DSL) для чтения из источника topi c и записи в целевой topi c он отлично работает для настройки одного кластера kafka (то есть, если и исходная, и целевая темы находятся в одном кластер), но когда исходная и целевая темы находятся в разных кластерах kafka, я получаю исключение NullPointerException для контекста целевого процессора
Topology
topology.addSource("mySource", "SourceTopic");
topology.addProcessor("SourceStreamProcessor",()->new SourceStreamProcessor(), "mySource");
topology.addProcessor("TargetProcessor",()->new TargetProcessor(), "Target");
topology.addSink("sink1","OUTPUT_TOPIC1","TargetProcessor");
topology.addSink("sink2","OUTPUT_TOPIC2","TargetProcessor");
Properties sourceProcessorProps = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SourceStreamProcessor"); // Kafka Cluster 1
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dev_Cluser_xx.org:9092");
Properties targetProcessorProps = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "targetStreamProcessor"); // Kafka Cluster 2
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test_Cluser_xx.org:9092");
Как мы можем использовать API процессора потоков kafka для записи из одной топи c в одну кластер на другой топи c в другом кластере?