У меня есть Flink
потоковая система, где я получаю данные о потоке кликов.
формат данных:
{"uid":"123", "event_type":"view","payload":{"p1":{"price":23}}}
{"uid":"123", "event_type":"view","payload":{"p2":{"price":25}}}
{"uid":"123", "event_type":"a2c","payload":{"p2"}}
{"uid":"123", "event_type":"a2c","payload":{"p1":{}}}
Итак, как мы видим, мы получаем два типа событий a2c
и view
. Разница между этими двумя типами событий заключается в view
цена события есть для всех продуктов, а в a2c
есть только имя, цена не там.
Чего я хочу добиться, так это создать консолидированный полезная нагрузка из всех событий в течение 10 минут. Также обогатите полезную нагрузку a2c
, получив price
информацию из соответствующего события view
.
консолидированная полезная нагрузка после завершения работы окна для каждого идентификатора пользователя:
{
"uid":"123",
"all":[
{"event_type":"view", "payload":{"p1":{"price":23}}},
{"event_type":"view","payload":{"p2":{"price":25}}},
{"event_type":"a2c","payload":{"p2":{"price":25}}},
{"event_type":"a2c","payload":{"p1":{"price":23}}}
],
"total":4
}
Как мне достичь это ? По сути, мне нужно поддерживать состояние всех событий view
в окне, а затем, как только я получу событие a2c
, мне нужно получить цену из этого состояния. Я не спрашиваю о каком-либо рабочем решении, просто как сохранить состояние всех событий view
в окне. Также у меня есть несколько пользовательских операций Reduce.
events.keyBy("uid").window(..).reduce(new ReduceCustomFun(..)).uid("..").name("..");
в ReduceCustomFun
: я объединяю данные двух событий в список.