Допустим, мы собираем статистику совместимости моделей автомобилей и шин.Входной поток перечисляет модель автомобиля в качестве ключа и совместимые модели шин в качестве значения:
тема шин автомобиля :
car1 -> [tire1, tire2, tire3]
car2 -> [tire2, tire4]
Желаемый конечный результат -таблица с моделью шины в качестве ключа и количеством моделей автомобилей, которые совместимы с этой шиной:
таблица подсчета шин :
tire1 -> 1
tire2 -> 2
tire3 -> 1
tire4 -> 1
Модели шинпрекратить иногда.Затем они удаляются из списка совместимости:
тема автомобильных шин :
car1 -> [tire2, tire3]
(«шина1» удалена).С другой стороны, новые модели шин поступают на рынок и добавляются в список совместимости:
тема шин для автомобилей :
car1 -> [tire2, tire3, tire5]
Как можноЯ достигаю этого преобразования, используя Kafka Streams DSL?
Мой подход # 1 В carTireStream.flatTransform()
я получаю старый список совместимости шин из государственного хранилища tire-car-table
.Для каждой модели шины, отсутствующей в новом значении (удалено), я создаю запись с помощью составного ключа:
{carId, tireId} -> null
Для каждой модели шины, отсутствующей в старом списке (добавлено), я выдаю
{carId, tireId} -> 1
запись.Этот поток затем агрегируется по ключу в car-tire-diff-table
.Эта таблица содержит только допустимые комбинации моделей автомобилей и шин.Все снятые с производства комбинации автомобилей и шин удаляются с помощью записей значений null
.
После этого эта таблица группируется по модели шины (путем извлечения модели шины из составного ключа).Функции агрегатора групп и вычитания создают списки, добавляя / удаляя модель автомобиля из / в список моделей автомобиля.В результате получается tire-car-table
:
tire2 -> [car1, car2]
tire3 -> [car1]
tire4 -> [car2]
tire5 -> [car1]
(Шина1 удалена, помните?)
Последний шаг прост.Я применяю .mapValues()
к этой таблице и выдаю длину списка.
У этого подхода есть несколько минусов:
- Это не чистый поток DSL.
- Мне нужно получить доступ к хранилищу состояний
tire-car-table
до того, как оно будет декларативно представлено. - Требуется промежуточное значение
car-tire-diff-table
. - Значения
car-tire-diff-table
используются только для сопоставленияоперации создания / удаления (1
/ null
).
Мой подход # 2 Используя API процессора, я могу избежать промежуточной таблицы и странного создания / удалениязначения отслеживания.
Процессор потребляет car-tire-topic
.Он читает старое значение из car-tire-table
и сохраняет новое значение в car-tire-table
(таким образом, обновляя его).Наличие старого и нового списка совместимости шин в одном месте: для каждой добавленной модели шины она считывает из tire-car-table
, добавляет модель автомобиля в список и записывает значение;для каждой удаленной модели шины она считывает из tire-car-table
, удаляет модель автомобиля и записывает обновленный список обратно.
Недостатки этого подхода:
- *
tire-car-table
должна быть глобальная таблица.Теперь есть способ, которым я мог бы создать алгоритм, который мог бы сопоставить любую возможную комбинацию автомобиля / шины с одним разделом, в то же время имея несколько разделов. - Процессор содержит много операций.
- Я вижуникоим образом, как я могу реализовать последний шаг подсчета с помощью Stream DSL.Я могу создать экземпляр хранилища состояний для
tire-car-tabble
, используя StreamsBuilder.addStateStore()
, но я не нашел способа создать экземпляр KTable
из этого.
Ограничение: это обобщение исходной проблемы,Я не могу обойти проблему, изменив исходную тему.Или добавив «Шина была удалена из отмеченного» потока и просто удалите запись шины из tire-car-table
.
Все было бы проще, если бы API KTable
выставил какой-то видобработчик обновления, который получит старое и новое значение.
Может кто-нибудь придумать более элегантный подход для решения проблемы, чем мой?