Как выполнить модульное тестирование API-интерфейсов Kafka Streams и Producer вместе - PullRequest
0 голосов
/ 03 февраля 2020

В настоящее время у меня есть базовое c потоковое приложение Kafka, которое включает топологию только с источником и процессором, но без приемника. По сути, топология обрабатывает только потребление сообщений. Что касается создания сообщений, мы выполняем вызовы к API производителя в экземпляре ProcessorSupplier, передаваемом в топологию, в частности в переопределенном методе process. Хотя я понимаю, что API производителя здесь избыточен, так как я мог бы просто добавить приемник в топологию, я нахожусь в положении, когда мне необходимо настроить свое потоковое приложение таким образом. Что касается тестирования, я опробовал класс TopologyTestDriver, который доступен в пакете kafka-streams-test-utils . Однако я хочу проверить не только топологию, но и вызовы API производителя. Использование TopologyTestDriver требует от меня издеваться над моим экземпляром Producer, поскольку он отделен от Streams API. В результате, поскольку сообщения не «переадресованы», я не могу прочитать сообщения из TopologyTestDriver для своих модульных тестов.

Вот упрощенная версия моего process метода:

@Override
public void process(String key, String value) {
    // some data processing stuff that I leave out for simplicity sake
    String topic = "...";
    Properties props = ...;
    //Producer<String, String> producer = new KafkaProducer<>(props);
    ProducerRecord<String, String> record = new ProducerRecord(topic, key, value);
    producer.send(record);
}

И вот упрощение моего примера модульного теста:

@Test
public void process() {
    Topology topology = new Topology();
    topology.addSource("source", "input-topic");
    topology.addProcessor("processor", ..., "source");
    Properties props = ...;

    TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);

    ConsumerRecordFactory<String, String> factory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
    // the following line will work fine as long as the producer is mocked
    testDriver.pipeInput(factory.create("input-topic", "key", "value"));

    // since the producer is mocked, no message can be read from the output topic
    ProducerRecord<String, String> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new StringDeserializer());

    assertNull(outputRecord); // returns true
}

Подводя итог моему вопросу, есть ли способ написать модульный тест, который проверяет как потребление, так и создание сообщений в топологии, которая использует API производителя для записи сообщений в исходящие темы?

1 Ответ

1 голос
/ 10 февраля 2020

Вы не должны использовать пользовательский Producer, но добавить раковину к Topology. Вызовы Producer.send() являются асин c, и, следовательно, вы можете потерять данные. Чтобы избежать потери данных, вам необходимо выполнить синхронизацию вызова c, ie, получить Future, возвращаемое send(), и дождаться его завершения, прежде чем process() вернется. Однако это сильно влияет на вашу пропускную способность и не рекомендуется.

Если вы добавите приемник, вы избежите этой проблемы, поскольку Kafka Streams теперь будет понимать, какие данные были отправлены на выход topi c, и, таким образом, потеря данных не произойдет, в то время как Kafka Streams может использовать более производительный асинхронный вызов c.

Помимо проблемы с корректностью, создается впечатление, что вы создаете новый KafkaProducer для каждого сообщения, которое вы обрабатываете в своем текущем коде, что довольно неэффективно. Кроме того, использование приемника упростит ваш код. И, конечно же, вы получаете надлежащие возможности тестирования, используя TopologyTestDriver.

...