У меня есть следующее:
KTable<Integer, A> tableA = builder.table("A");
KStream<Integer, B> streamB = builder.stream("B");
Сообщения в потоке B должны быть обогащены данными из таблицы A.
Пример данных:
Topic A: (1, {name=john})
Topic B: (1, {type=create,...}), (1, {type=update,...}), (1, {type=update...})
В идеальноммир, я хотел бы сделать
streamB.join(tableA, (b, a) -> { b.name = a.name; return b; })
.selectKey((k,b) -> b.name)
.to("C");
К сожалению, это не работает для меня, потому что мои данные таковы, что каждый раз, когда сообщение записывается в тему A, соответствующее сообщение также пишется в тему B (источник - это одна транзакция БД)Теперь после этой первоначальной транзакции «создание» тема B будет продолжать получать больше сообщений.Иногда по теме B может отображаться несколько событий в секунду, но также возможно иметь последовательные события с разницей в несколько часов для данного ключа.
Причина, по которой простое решение не работает, заключается в том, что исходная транзакция создания вызываетусловие гонки: раздел A и B получают свое сообщение почти одновременно, и, если сообщение B достигает части топологии «join» первым (скажем, за несколько мс до того, как туда попадает сообщение A), tableA еще не будет содержать соответствующую запись.На данный момент событие потеряно.Я вижу, что это происходит в теме C: некоторые события появляются, некоторые нет (если я использую leftJoin, все события появляются, но некоторые имеют нулевой ключ, который эквивалентен потере).Это только проблема для начальной транзакции создания.После этого каждый раз, когда происходит событие в теме B, соответствующая запись существует в таблице A.
Поэтому мой вопрос: как вы это исправите?
Мое текущее решение безобразно.Что я делаю, так это то, что я создал «коллекцию B» и прочитал тему B, используя
B.groupByKey()
.aggregate(() -> new CollectionOfB(), (id, b, agg) -> agg.add(b));
.join(tableA, ...);
Теперь у нас есть соединение KTable-KTable, которое не подвержено этому состоянию гонки.Причина, по которой я считаю это «уродливым», заключается в том, что после каждого объединения я должен отправить специальное сообщение обратно в тему B, в котором, по сути, говорится «удалить событие (я), которое я только что обработал, из коллекции».Если это специальное сообщение не будет отправлено в тему B, коллекция будет продолжать расти, и о каждом событии в коллекции будет сообщаться при каждом соединении.
В настоящее время я изучаю, будет ли работать оконное объединение (см. Aи B в KStreams и использовать оконное соединение).Я не уверен, что это будет работать, потому что нет верхней границы размера окна.Я хочу сказать, «окно начинается за 1 секунду до» и заканчивается бесконечно долго «после».Даже если я каким-то образом смогу сделать эту работу, я немного обеспокоен требованием пространства для неограниченного окна.
Любое предложение будет с благодарностью.