Потоки Кафки не позволяют создавать потоки с исходными и выходными темами из разных кластеров Кафки.Поэтому следующий код не будет работать для вас
streamsBuilder.stream(sourceTopicName).filter(..).to(outputTopicName)
, в этом случае он ожидает, что outputTopicName находится в том же кластере, что и раздел sourceTopicName.
В качестве обходного пути ,чтобы отправлять сообщения в тему вывода из другого кластера, вы можете использовать дополнительно созданный KafkaProducer со свойством bootstrap.servers
, которое будет указывать на внешний кластер, и KStream.foreach()
метод .
streamsBuilder.stream(sourceTopicName)
.filter((key, value) -> ..)
.foreach((key, value) ->
sendMessage(kafkaProducerFromAnotherCluster, destinationTopicName, key, value);
public static void sendMessage(KafkaProducer<String, String> kafkaProducer,
String destinationTopicName, String key, String value) {
try {
kafkaProducer.send(new ProducerRecord(destinationTopicName, key, value));
} catch (RuntimeException ex) {
log.error(errorMessage, ex);
}
}
Другой вариант - создать тему вывода в вашем кластере Kafka, в которой будут отфильтрованные сообщения, и настроить Зеркальное отображение Кафки между двумя кластерами (поэтому сообщения будут скопированы из одной темы).на секунду из другого кластера).