Я думаю, что что-то неправильно понял с помощью TopologyTestDriver.
До сих пор, когда я разрабатывал приложение Kafka Stream, я подключался к реальной Kafka в тестовой среде, вручную создавал и подпитывал темы и проверялсодержание того, что находится в не по теме.
Чтобы автоматизировать тесты, я посмотрел на TopologyTestDriver
.Согласно документации, опубликованной Confluent (https://docs.confluent.io/current/streams/developer-guide/test-streams.html), похоже, что я ищу:
[...] Обычно вы запускаете топологию с помощью класса KafkaStreams., который подключается к вашему брокеру [...].Для тестирования запуск брокера [...] добавляет много сложности и времени.
Streams предоставляет замену TopologyTestDriver [...] для класса KafkaStreams.Он не имеет внешних системных зависимостей [...] Существуют хуки для проверки данных, отправляемых в выходные темы [...].
Меня беспокоит расположение топологии.В каждом примере, который я видел (в Confluent, на веб-сайте Kafka, ...), топология определяется в тестовом файле:
private TopologyTestDriver testDriver;
....
@Before
public void setup() {
Topology topology = new Topology();
topology.addSource("sourceProcessor", "input-topic");
.......
// setup test driver
Properties config = new Properties();
....
testDriver = new TopologyTestDriver(topology, config);
// pre-populate store
.....
}
@After
public void tearDown() {
testDriver.close();
}
@Test
public void shouldFlushStoreForFirstInput() {
testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
OutputVerifier.compareKeyValue(......);
Assert.assertNull(........);
}
Как я могу реализовать тесты для моих существующих приложений Kafka Streams?Я не могу себе представить, что мне нужно продублировать код моей топологии - одну версию в моем основном файле и одну версию в моем тестовом файле.Это было бы трудно поддерживать.
Мои существующие приложения имеют топологию, определенную непосредственно в методе main
.Я мог бы извлечь топологию из основного, добавив статический метод, такой как protected static org.apache.kafka.streams.Topology buildTopology() {...}
, и я мог бы вызвать этот метод из тестового файла.
Но нет ли более чистого решения для выполненияосновной код из тестового кода?