Кажется, что ошибка происходит, потому что посредник не работает.Но KafkaStreams работает.
Чтобы исправить это, я думаю, что вам нужно запретить запуск KafkaStreams при тестировании.
Аннотация @SpringBootTest предоставляет классы атрибутов.Вы можете указать классы, которые будут зарегистрированы как bean-компоненты
. Вы можете запретить регистрацию связанных bean-компонентов KafkaStreams, указав атрибут классов.
Например, вы можете выполнить тестирование следующим образом.В этом случае описанная выше проблема не возникнет, потому что связанные компоненты KafkaStreams не зарегистрированы.
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {ServiceImpl.class, CommonConfig.class})
public class SomeClassTest {
@Autowired
private ServiceImpl articleServiceImpl;
// do test
}
Когда вы используете @SpringBootTest, лучше всего регистрировать только те компоненты, которые вы хотите протестировать.
Если вы не знакомы с этим подходом, попробуйте смоделировать bean-компонент, управляющий объектом KafkaStreams
Например, я создаю следующий bean-компонент для запуска и завершения объекта KafkaStreams.
@Component
public class ManageableStream implements DisposableBean, InitializingBean {
private final KafkaStreams kafkaStreams;
public ManageableStream() {
StreamsConfig config = buildStreamConfig();
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> inputStream = builder.stream("source.topic");
inputStream.to("destination.topic");
Topology topology = builder.build();
kafkaStreams = new KafkaStreams(topology, config);
}
@Override
public void destroy() throws Exception {
kafkaStreams.close();
}
@Override
public void afterPropertiesSet() throws Exception {
kafkaStreams.start();
}
private StreamsConfig buildStreamConfig() {
Map<String, Object> properties = new HashMap<>();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-stream-application");
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
return new StreamsConfig(properties);
}
}
При тестировании макет bean-компонента, который управляет объектом KafkaStreams.
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@MockBean
private ManageableStream manageableStream;
@Test
public void contextLoads() {
}
}
Тогда KafkaStreams не запустится, поэтому описанная выше проблема не возникнет.
Если вы хотите проверить топологию KafkaStreamsПожалуйста, смотрите ссылку ниже
https://kafka.apache.org/11/documentation/streams/developer-guide/testing.html
Надеюсь, мой ответ был полезен