Недавно я начал изучать Spring Cloud Stream для Kafka и изо всех сил пытался заставить TestBinder работать с Kstreams. Это известное ограничение или я что-то упустил?
Это прекрасно работает:
Строковый процессор:
@StreamListener(TopicBinding.INPUT)
@SendTo(TopicBinding.OUTPUT)
public String process(String message) {
return message + " world";
}
Струнный тест:
@Test
@SuppressWarnings("unchecked")
public void testString() {
Message<String> message = new GenericMessage<>("Hello");
topicBinding.input().send(message);
Message<String> received = (Message<String>) messageCollector.forChannel(topicBinding.output()).poll();
assertThat(received.getPayload(), equalTo("Hello world"));
}
Но когда я пытаюсь использовать KStream в своем процессе, я не могу заставить TestBinder работать.
процессор Kstream:
@SendTo(TopicBinding.OUTPUT)
public KStream<String, String> process(
@Input(TopicBinding.INPUT) KStream<String, String> events) {
return events.mapValues((value) -> value + " world");
}
Тест KStream:
@Test
@SuppressWarnings("unchecked")
public void testKstream() {
Message<String> message = MessageBuilder
.withPayload("Hello")
.setHeader(KafkaHeaders.TOPIC, "event.sirism.dev".getBytes())
.setHeader(KafkaHeaders.MESSAGE_KEY, "Test".getBytes())
.build();
topicBinding.input().send(message);
Message<String> received = (Message<String>)
messageCollector.forChannel(topicBinding.output()).poll();
assertThat(received.getPayload(), equalTo("Hello world"));
}
Как вы, возможно, заметили, я опустил @StreamListener из процессора Kstream, но без него не похоже, что тестбиндер может найти обработчик. (но при этом он не работает при запуске приложения)
Это известная ошибка / ограничение, или я просто делаю что-то глупое здесь?