TopologyTestDriver отправляет неверное сообщение на агрегаты KTable - PullRequest
0 голосов
/ 25 января 2019

У меня есть топология, которая агрегируется в KTable. Это общий метод, который я создал, чтобы построить эту топологию на разные темы.

public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B> table, Function<B, C> getKeyFunction,
        Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
    return table
            .groupBy((key, value) -> KeyValue.pair(getKeyFunction.apply(value), value),
                    Serialized.with(keySerde, valueSerde))
            .aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
                agg.remove(newValue);
                agg.add(newValue);
                return agg;
            }, (key, oldValue, agg) -> {
                agg.remove(oldValue);
                return agg;
            }, Materialized.with(keySerde, aggregatedSerde));
}

Это работает довольно хорошо при использовании Kafka, но не при тестировании через `TopologyTestDriver`.

В обоих случаях, когда я получаю обновление, сначала вызывается subtractor, а затем вызывается adder. Проблема заключается в том, что при использовании TopologyTestDriver для обновления отправляются два сообщения: одно после вызова subtractor, а другое после вызова adder. Не говоря уже о том, что сообщение, отправленное после subrtractor и до adder, находится на неправильной стадии.

Кто-нибудь еще может подтвердить, что это ошибка? Я проверил это для обеих версий Кафки 2.0.1 и 2.1.0.

РЕДАКТИРОВАТЬ:
Я создал тестовый пример в github для иллюстрации проблемы: https://github.com/mulho/topology-testcase

1 Ответ

0 голосов
/ 26 января 2019

Ожидается, что будут две выходные записи (одна запись «минус» и одна запись «плюс»).Немного сложно понять, как это работает, поэтому позвольте мне попытаться объяснить.

Предположим, у вас есть следующая таблица ввода:

 key |  value
-----+---------
  A  |  <10,2>
  B  |  <10,3>
  C  |  <11,4>

На KTable#groupBy() вы извлекаете первую частьзначение как новый ключ (т. е. 10 или 11), а затем сумма второй части (т. е. 2, 3, 4) в агрегации.Поскольку записи A и B имеют новый ключ 10, вы суммируете 2+3, а для нового ключа 11 вы также суммируете 4.Таблица результатов будет выглядеть следующим образом:

 key |  value
-----+---------
  10 |  5
  11 |  4

Теперь предположим, что запись обновления <B,<11,5>> изменит исходный вход KTable на:

 key |  value
-----+---------
  A  |  <10,2>
  B  |  <11,5>
  C  |  <11,4>

Таким образом, новая таблица результатов должна суммироваться 5+4 для 11 и 2 для 10:

 key |  value
-----+---------
  10 |  2
  11 |  9

Если вы сравните первую таблицу результатов со второй, вы можете заметить, что обе строки получили обновление.Старая запись B|<10,3> вычитается из 10|5, что приводит к 10|2, а новая запись B|<11,5> добавляется к 11|4, что приводит к 11|9.

Это именно две выходные записи, которые выувидеть.Первая выходная запись (после выполнения вычитания) обновляет первую строку (она вычитает старое значение, которое больше не является частью результата агрегации), а вторая запись добавляет новое значение к результату агрегации.В нашем примере запись вычитания будет <10,<null,<10,3>>>, а запись добавления будет <11,<<11,5>,null>> (формат этих записей - <key, <plus,minus>> (обратите внимание, что запись вычитания устанавливает только часть minus, в то время как запись добавления толькоplus часть).

Заключительное замечание: невозможно объединить записи с плюсом и минусом, поскольку ключ записи с плюсом и минусом может быть разным (в нашем примере 11 и 10), и, следовательно, может входить в разные разделы. Это означает, что операция «плюс» и «минус» может выполняться на разных машинах, и, следовательно, невозможно создать только одну запись, содержащую часть плюс и минус.

...