Kafka GroupTable тестирует генерацию дополнительных сообщений при использовании ProcessorTopologyTestDriver - PullRequest
0 голосов
/ 19 октября 2018

Я написал поток, который принимает сообщения и отправляет таблицу появившихся ключей.Если что-то появится, будет показано число 1. Это упрощенная версия моего производственного кода, чтобы продемонстрировать ошибку.В режиме реального времени сообщение отправляется для каждого полученного сообщения.

Однако, когда я запускаю его в модульном тесте с использованием ProcessorTopologyTestDriver, я получаю другое поведение.Если ключ, который уже был просмотрен ранее, получен, я получаю дополнительное сообщение.

Если я отправляю сообщения с ключами «key1», затем «key2», затем «key1», я получаю следующий вывод.

key1 - 1
key2 - 1
key1 - 0
key1 - 1

По какой-то причине он уменьшает значение перед добавлением. Это происходит только при использовании ProcessorTopologyTestDriver.Это ожидается?Есть ли работа вокруг?Или это ошибка?

Вот моя топология:

final StreamsBuilder builder = new StreamsBuilder();
    KGroupedTable<String, String> groupedTable
            = builder.table(applicationConfig.sourceTopic(), Consumed.with(Serdes.String(), Serdes.String()))
            .groupBy((key, value) -> KeyValue.pair(key, value), Serialized.with(Serdes.String(), Serdes.String()));

    KTable<String, Long> countTable = groupedTable.count();

    KStream<String, Long> countTableAsStream = countTable.toStream();
    countTableAsStream.to(applicationConfig.outputTopic(), Produced.with(Serdes.String(), Serdes.Long()));

Вот мой код модульного теста:

TopologyWithGroupedTable top = new TopologyWithGroupedTable(appConfig, map);
    Topology topology = top.get();
    ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, topology);
    driver.process(inputTopic, "key1", "theval", Serdes.String().serializer(), Serdes.String().serializer());
    driver.process(inputTopic, "key2", "theval", Serdes.String().serializer(), Serdes.String().serializer());
    driver.process(inputTopic, "key1", "theval", Serdes.String().serializer(), Serdes.String().serializer());

    ProducerRecord<String, Long> outputRecord = driver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
    assertEquals("key1", outputRecord.key());
    assertEquals(Long.valueOf(1L), outputRecord.value());
    outputRecord = driver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
    assertEquals("key2", outputRecord.key());
    assertEquals(Long.valueOf(1L), outputRecord.value());
    outputRecord = driver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
    assertEquals("key1", outputRecord.key());
    assertEquals(Long.valueOf(1L), outputRecord.value()); //this fails, I get 0.  If I pull another message, it shows key1 with a count of 1

Вот репозиторий полного кода:

https://bitbucket.org/nsinha/testtopologywithgroupedtable/src/master/

Топология потока: https://bitbucket.org/nsinha/testtopologywithgroupedtable/src/master/src/main/java/com/nick/kstreams/TopologyWithGroupedTable.java

Код теста: https://bitbucket.org/nsinha/testtopologywithgroupedtable/src/master/src/test/java/com/nick/kstreams/TopologyWithGroupedTableTests.java

1 Ответ

0 голосов
/ 20 октября 2018

Это не ошибка, а поведение по замыслу (см. Пояснение ниже).

Разница в поведении обусловлена ​​KTable кэшированием в хранилище состояний (ср. https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html). При запускеВ модульном тесте кэш сбрасывается после каждой записи, а в производственном цикле это не так. Если вы отключите кэширование в производственном цикле, я предполагаю, что он ведет себя так же, как в модульном тесте.

Дополнительное замечание: ProcessorTopologyTestDriver является внутренним классом и не является частью общедоступного API. Таким образом, нет гарантии совместимости. Вместо него следует использовать официальные пакеты для юнит-теста: https://docs.confluent.io/current/streams/developer-guide/test-streams.html

Почему вы видите две записи:

В своем коде вы используете KTable#groupBy(), а в вашем конкретном случае использования вы не меняете ключ. Однако вВообще, ключ может быть изменен (в зависимости от значения входного значения KTable. Таким образом, если изменяется входное значение KTable, нижестоящее агрегирование должно удалить / вычесть старую пару ключ-значение из результата агрегирования, идобавить новый ключ-ваСочетание пары с результатом агрегации - в общем случае ключи старой и новой пары различны, и, следовательно, требуется создать две записи, поскольку вычитание и сложение могут происходить в разных экземплярах, поскольку разные ключи могут хэшироваться по-разному.Имеет ли это смысл?

Таким образом, для каждого обновления ввода KTable два обновления два результата KTable обычно для двух разных пар ключ-значение должны быть вычислены.Для вашего конкретного случая, в котором ключ не изменяется, Kafka Stream делает то же самое (для этого случая нет проверки / оптимизации, чтобы «объединить» обе операции в одну, если ключ на самом деле один и тот же).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...