Я хочу провести модульное тестирование агрегата Kafka Stream, и я совершенно не понимаю, какой метод использовать.Я читал о TestSupportBinder, но я не думаю, что это работает в моем случае, поэтому я использую метод KafkaEmbedded.Вот как я инициализирую встроенный Kafka.
@Before
public void setUp() throws Exception{
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-id", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
DefaultKafkaConsumerFactory<Object, LoggerMessage> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, OUTPUT_TOPIC);
}
Я хочу проверить следующее:
public interface Channels {
String LOGGER_IN_STREAM = "logger-topic-in-stream";
String LOGGER_IN = "logger-topic-in";
String LOGGERDATAVALIDATED_OUT = "loggerDataValidated-topic-out";
@Input(Channels.LOGGER_IN)
SubscribableChannel processMessage();
@Input(Channels.LOGGER_IN_STREAM)
KStream<Object, LoggerMessage> loggerKstreamIn();
@Output(Channels.LOGGERDATAVALIDATED_OUT)
MessageChannel validateLoggerData();
}
И я получаю следующее сообщение об ошибке
org.springframework.beans.factory.BeanCreationException: Ошибка создания компонента с именем 'some.domain.Channels': сбой вызова метода init;вложенное исключение - java.lang.IllegalStateException: не найдена фабрика для целевого типа привязки: org.apache.kafka.streams.kstream.KStream среди зарегистрированных фабрик: channelFactory, messageSourceFactory
Причина: java.lang.IllegalStateException:Не найдена фабрика для целевого типа привязки: org.apache.kafka.streams.kstream.KStream среди зарегистрированных фабрик: channelFactory, messageSourceFactory
Что я делаю не так?