У меня есть следующая сигнатура метода bean-компонента процессора:
@Bean
public BiFunction<KStream<String, MyClass>, KStream<String, String>, KStream<String, MyClass>[]> myStream() {
return (inputStream1, intputStream2) -> {
intputStream2
.peek((k, v) -> {
log.debug(...);
});
return inputStream1
.mapValues(...)
.branch((k,v) -> true, (k,v) -> true);
};
}
Соответствующие свойства:
spring.cloud.stream.function.definition: ...;myStream
spring.cloud.stream.bindings:
myStream-in-0:
destination: inp0
myStream-in-1:
destination: inp1
myStream-out-0:
destination: out0
myStream-out-1:
destination: out1
Spring Cloud Kafka Stream версия Hoxton.SR4 (spring-cloud-stream-binder -kafka-streams: jar: 3.0.4.RELEASE), встроенный Kafka версии 2.5.0.
Я тестирую свою топологию с помощью встроенного Kafka:
@RunWith(SpringRunner.class)
@SpringBootTest(
properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
topics = {
"inp0", "inp1", "out0", "out1"
},
brokerPropertiesLocation = "kafka.properties"
)
@Slf4j
public class MyApplicationTests {
@Test
public void embeddedKafkaTest() throws IOException, InterruptedException {
Consumer<String, MyClass> out0Consumer = createConsumer("out0ConsumerGroup");
Consumer<String, MyClass> out1Consumer = createConsumer("out1ConsumerGroup");
this.embeddedKafka.consumeFromAnEmbeddedTopic(out0Consumer, "out0");
this.embeddedKafka.consumeFromAnEmbeddedTopic(out1Consumer, "out1");
latch = new CountDownLatch(1);
// ... publish ...
latch.await(15, TimeUnit.SECONDS);
ConsumerRecords<String, MyClass> out0 = KafkaTestUtils.getRecords(out0Consumer);
assertThat(out0.count(), is(greaterThanOrEqualTo(1)));
ConsumerRecords<String, MyClass> out1 = KafkaTestUtils.getRecords(out1Consumer);
assertThat(out1.count(), is(greaterThanOrEqualTo(1)));
}
private <K,V> Consumer<K, V> createConsumer(String groupName) {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(groupName, "true", this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<K, V>(consumerProps).createConsumer();
}
Мои тесты показывают, что сообщения от myStream
достигают и попадают в topi c "out0", как и ожидалось, но "out1" topi c остается пустым, и модульный тест не выполняется при втором утверждении.
Я пробовал пару вещей, но похоже, что вывод на второй вывод topi c просто не производится (вывод на первый вывод topi c производится хорошо).
Вы видите какие-нибудь ошибки в моей настройке?
И еще одно: оператор return в определении метода bean-компонента myStream показывает предупреждение компилятора:
Непроверенное создание массива универсальных шаблонов для параметра varargs
Но похоже, что ' s как API-интерфейс Spring Cloud Kafka Stream 3.x требует определения возвращаемого типа?