Как преобразовать пару данных m: n в таблицу: m? - PullRequest
1 голос
/ 22 сентября 2019

Допустим, мы собираем статистику совместимости моделей автомобилей и шин.Входной поток перечисляет модель автомобиля в качестве ключа и совместимые модели шин в качестве значения:

тема шин автомобиля :

car1 -> [tire1, tire2, tire3]
car2 -> [tire2, tire4]

Желаемый конечный результат -таблица с моделью шины в качестве ключа и количеством моделей автомобилей, которые совместимы с этой шиной:

таблица подсчета шин :

tire1 -> 1
tire2 -> 2
tire3 -> 1
tire4 -> 1

Модели шинпрекратить иногда.Затем они удаляются из списка совместимости:

тема автомобильных шин :

car1 -> [tire2, tire3]

(«шина1» удалена).С другой стороны, новые модели шин поступают на рынок и добавляются в список совместимости:

тема шин для автомобилей :

car1 -> [tire2, tire3, tire5]

Как можноЯ достигаю этого преобразования, используя Kafka Streams DSL?

Мой подход # 1 В carTireStream.flatTransform() я получаю старый список совместимости шин из государственного хранилища tire-car-table.Для каждой модели шины, отсутствующей в новом значении (удалено), я создаю запись с помощью составного ключа:

{carId, tireId} -> null

Для каждой модели шины, отсутствующей в старом списке (добавлено), я выдаю

{carId, tireId} -> 1

запись.Этот поток затем агрегируется по ключу в car-tire-diff-table.Эта таблица содержит только допустимые комбинации моделей автомобилей и шин.Все снятые с производства комбинации автомобилей и шин удаляются с помощью записей значений null.

После этого эта таблица группируется по модели шины (путем извлечения модели шины из составного ключа).Функции агрегатора групп и вычитания создают списки, добавляя / удаляя модель автомобиля из / в список моделей автомобиля.В результате получается tire-car-table:

tire2 -> [car1, car2]
tire3 -> [car1]
tire4 -> [car2]
tire5 -> [car1]

(Шина1 удалена, помните?)

Последний шаг прост.Я применяю .mapValues() к этой таблице и выдаю длину списка.

У этого подхода есть несколько минусов:

  • Это не чистый поток DSL.
  • Мне нужно получить доступ к хранилищу состояний tire-car-table до того, как оно будет декларативно представлено.
  • Требуется промежуточное значение car-tire-diff-table.
  • Значения car-tire-diff-table используются только для сопоставленияоперации создания / удаления (1 / null).

Мой подход # 2 Используя API процессора, я могу избежать промежуточной таблицы и странного создания / удалениязначения отслеживания.

Процессор потребляет car-tire-topic.Он читает старое значение из car-tire-table и сохраняет новое значение в car-tire-table (таким образом, обновляя его).Наличие старого и нового списка совместимости шин в одном месте: для каждой добавленной модели шины она считывает из tire-car-table, добавляет модель автомобиля в список и записывает значение;для каждой удаленной модели шины она считывает из tire-car-table, удаляет модель автомобиля и записывает обновленный список обратно.

Недостатки этого подхода:

  • * tire-car-tableдолжна быть глобальная таблица.Теперь есть способ, которым я мог бы создать алгоритм, который мог бы сопоставить любую возможную комбинацию автомобиля / шины с одним разделом, в то же время имея несколько разделов.
  • Процессор содержит много операций.
  • Я вижуникоим образом, как я могу реализовать последний шаг подсчета с помощью Stream DSL.Я могу создать экземпляр хранилища состояний для tire-car-tabble, используя StreamsBuilder.addStateStore(), но я не нашел способа создать экземпляр KTable из этого.

Ограничение: это обобщение исходной проблемы,Я не могу обойти проблему, изменив исходную тему.Или добавив «Шина была удалена из отмеченного» потока и просто удалите запись шины из tire-car-table.

Все было бы проще, если бы API KTable выставил какой-то видобработчик обновления, который получит старое и новое значение.

Может кто-нибудь придумать более элегантный подход для решения проблемы, чем мой?

1 Ответ

1 голос
/ 23 сентября 2019

Если вам нужно полное транспонированное отображение, я не думаю, что вы можете сделать намного лучше, чем подход № 1.Как вы указали, у вас есть два шага с состоянием с разными ключами, поэтому вы должны выполнить операцию, по крайней мере, в два шага для поддержки нескольких разделов.

Если все, что вам нужно, это конечный счет, вы можете flatTransform вашисходный carTireStream поток либо в tireId -> 1 для новой записи шины, либо в tireId -> -1 для записи шины, которая была удалена (используя ваш tire-car-table государственный магазин), затем ...

tireDeltaStream
  .groupByKey()
  .reduce((oldCount, delta) -> oldCount + delta)

Вы сейчасиметь таблицу с последним количеством автомобилей на шину, которую вы можете запросить (если вы дадите ей имя) или записать в поток.

Если вы хотите сделать все это с использованием DSL высокого уровня,единственный способ, о котором я мог подумать, - это заменить вызов flatTransform на aggregate в потоке carTireStream, сохранив последний список шин и список дельт, а затем flatMapизвлечь дельты.

Например, следующее сообщение по теме carTireStream

car1 -> [tire1, tire2, tire3]

будет преобразовано агрегатом в ...

car1 -> ([tire1, tire2, tire3], [tire1 -> 1, tire2 -> 1, tire3 -> 1])

, котороепосле flatMap, который извлек дельтыld be ...

tire1 -> 1
tire2 -> 1
tire3 -> 1

Тогда следующее сообщение по теме carTireStream

car1 -> [tire2, tire3, tire5]

будет преобразовано агрегатом в ...

car1 -> ([tire2, tire3, tire5], [tire1 -> -1, tire5 -> 1])

, который после flatMap будет ...

tire1 -> -1
tire5 -> 1

Такой подход был бы намного чище, если бы метод агрегирования имел механизм для выдачи значений, отличных от текущего значения внутреннего состояния,В этом случае вы просто сохраните последний список идентификаторов шин и выдадите значения tireId -> delta, что избавит вас от необходимости сохранять дополнительное состояние и иметь дополнительный шаг flatMap.

...