Скажем так, у нас есть 2 темы
Пользователи {'UserId', {'UserName', 'Age'}
Posts {'PostId', {'UserId', 'PostДанные ',' Дата публикации '}}
Я хочу рассчитать текущее общее количество сообщений по любому имени пользователя на определенную дату.(не userId)
У меня есть потоковое приложение, которое выполняет эту агрегацию.
Пользователь имеет право вносить изменения в любые данные и всякий раз, когда пользователь вносит какие-либо изменения, новыйсобытие регистрируется в Kafka.
В моей логике приложения потоков есть соединение между сообщениями KStream и Users KTable.Однако, возможно, что запись в Users KTable появляется после записи Posts KStream.Топология определяется таким образом, что в случае неудачного соединения (слева) сообщение отправляется в другую тему с именем «Пропущенные сообщения», чтобы я мог выполнить объединение, как только пользователь станет доступен.Если соединение успешно, объединенная структура данных передается в другую тему, скажем, «Объединенные сообщения».
Теперь, чтобы удовлетворить пропущенные сообщения, я выполняю windowedJoin с Users KStream.Поэтому, как только я получаю информацию о пользователе, я присоединяю ее к соответствующим пропущенным сообщениям (если таковые имеются) и добавляю объединенные сообщения в тему «Присоединенные сообщения».
Любые последующие обновления в «Публиковать данные»или «Дата публикации» будет корректно добавлена в объединенные сообщения, и логика агрегирования в объединенных сообщениях будет работать правильно.
Теперь давайте просто скажем, что сообщение «Пользователь 1» было отправлено как {'Post1 ', {' 1 января ',' Hello '}, который был пропущен.Сначала его отправляли в пропущенные посты, а затем в присоединенные посты после того, как информация пользователя стала доступна через некоторое время.
Пользователь изменил дату публикации сообщений с 1 января на 2 января.На этот раз соединение было успешным (так как информация о пользователях была в Ktable) и отправлено прямо в Joined-Posts.Таким образом, у нашей объединенной записи KTable появится новая запись {'PostId', {'User1Name', 'Hello', '2nd Jan'}}.Здесь также хорошо работает наша логика агрегирования.
Проблема возникает, когда пользователь вносит изменения в информацию пользователя в течение оконного периода.Теперь, если пользователь меняет свой возраст, событие, созданное в потоковом приложении, снова присоединяется к пропущенным сообщениям и отправляет {PostId, {'UserName', 'Hello', '1st Jan'}} в KTable Joined-Posts какНесвежий пост все еще присутствует в пропущенных постах.Таким образом, новая запись в Ktable заменяется старым устаревшим значением, поддерживаемым потоком пропущенных сообщений.Это испортит логику агрегации.
Надеюсь, это объясняет мою проблему.Я исследовал много вариантов, но все они терпят неудачу так или иначе.Возможно, мне не известны некоторые функции Kafka Streams, которые могут помочь мне в этой ситуации.
Я чувствую, что упускаю что-то действительно очевидное.
Я могу предоставить примеры кода, если кому-то нужно.