Я пытаюсь выполнить модульное тестирование использования kafka с весенней загрузкой, но у меня проблемы с каналом ввода. Ниже приведен отрывок из того, что я делаю.
public interface MyCustomStreamBinding{
@Input
SubscribableChannel consumeChannel();
@Output
MessageChannel produceChannel();
}
@EnableBinding(value = { Source.class, MyCustomStreamBinding.class })
public class StreamConfiguration {
...
}
@Service
public class MyService {
private final MyCustomStreamBinding streamBinding;
public MyService(MyCustomStreamBinding streamBinding) {
this.streamBinding = streamBinding;
}
public void sendMessage() {
streamBinding.produceChannel().send(new SomeObject);
}
@StreamListener("consumeChannel")
public void consumeChannel(SomeObject payload){
// do processing of payload
}
}
Затем в моих тестовых примерах у меня есть
@SpringBootTest(classes = {MyApp.class})
class MyServiceTest {
private MyService myService;
@Autowired
private MyCustomStreamBinding streamBinding;
@Autowired
private MessageCollector messageCollector;
@BeforeEach
public void setup(){
myService = new MyService(streamBinding);
}
@Test
public void TestMessaging(){
myService.sendMessage();
Message<?> m = messageCollector.forChannel(streamBinding.produceChannel()).poll();
assertThat(m.getPayload(), equalTo(new SomeObject()));
}
}
Как я могу проверить takeChannel и что он действительно выполнил обработку, как ожидалось