В настоящее время у меня есть базовое c потоковое приложение Kafka, которое включает топологию только с источником и процессором, но без приемника. По сути, топология обрабатывает только потребление сообщений. Что касается создания сообщений, мы выполняем вызовы к API производителя в экземпляре ProcessorSupplier, передаваемом в топологию, в частности в переопределенном методе process
. Хотя я понимаю, что API производителя здесь избыточен, так как я мог бы просто добавить приемник в топологию, я нахожусь в положении, когда мне необходимо настроить свое потоковое приложение таким образом. Что касается тестирования, я опробовал класс TopologyTestDriver
, который доступен в пакете kafka-streams-test-utils . Однако я хочу проверить не только топологию, но и вызовы API производителя. Использование TopologyTestDriver
требует от меня издеваться над моим экземпляром Producer
, поскольку он отделен от Streams API. В результате, поскольку сообщения не «переадресованы», я не могу прочитать сообщения из TopologyTestDriver
для своих модульных тестов.
Вот упрощенная версия моего process
метода:
@Override
public void process(String key, String value) {
// some data processing stuff that I leave out for simplicity sake
String topic = "...";
Properties props = ...;
//Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord(topic, key, value);
producer.send(record);
}
И вот упрощение моего примера модульного теста:
@Test
public void process() {
Topology topology = new Topology();
topology.addSource("source", "input-topic");
topology.addProcessor("processor", ..., "source");
Properties props = ...;
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
ConsumerRecordFactory<String, String> factory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
// the following line will work fine as long as the producer is mocked
testDriver.pipeInput(factory.create("input-topic", "key", "value"));
// since the producer is mocked, no message can be read from the output topic
ProducerRecord<String, String> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new StringDeserializer());
assertNull(outputRecord); // returns true
}
Подводя итог моему вопросу, есть ли способ написать модульный тест, который проверяет как потребление, так и создание сообщений в топологии, которая использует API производителя для записи сообщений в исходящие темы?