В качестве одного из моих последних шагов в потоковом приложении я хочу отсортировать неупорядоченные события в системе.
Для этого я использовал:
events.keyBy((Event event) -> event.id)
.process(new SortFunction())
.print();
Где sort
Функция:
public static class SortFunction extends KeyedProcessFunction<String, Event, Event> {
private ValueState<PriorityQueue<Event>> queueState = null;
@Override
public void open(Configuration config) {
ValueStateDescriptor<PriorityQueue<Event>> descriptor = new ValueStateDescriptor<>(
// state name
"sorted-events",
// type information of state
TypeInformation.of(new TypeHint<PriorityQueue<Event>>() {
}));
queueState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
PriorityQueue<Event> queue = queueState.value();
if (queue == null) {
queue = new PriorityQueue<>(10);
}
queue.add(event);
queueState.update(queue);
timerService.registerEventTimeTimer(event.timestamp);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) throws Exception {
PriorityQueue<Event> queue = queueState.value();
Long watermark = context.timerService().currentWatermark();
Event head = queue.peek();
while (head != null && head.timestamp <= watermark) {
out.collect(head);
queue.remove(head);
head = queue.peek();
}
}
}
То, что я сейчас пытаюсь сделать, это попытаться парализовать это. Моя текущая идея - сделать следующее:
events.keyBy((Event event) -> event.id)
.rebalance()
.process(new SortFunction()).setParalelism(3)
.map(new KWayMerge()).setParalelism(1).
.print();
Если то, что я понимаю, правильно, что должно произойти в этом случае, и исправьте меня, если я ошибаюсь, это то, что раздел каждого из Событий для данного ключа (в идеале 1/3) будет идти к каждому из параллельные экземпляры SortFunction
, в этом случае для полной сортировки мне нужно создать map
или другой processFunction
, который получает отсортированные события из 3 различных экземпляров и объединяет их вместе.
Если это так, есть ли способ отличить происхождение события, полученного map
, чтобы я мог выполнить трехстороннее объединение на map
? Если это невозможно, моей следующей идеей будет поменять PriorityQueue
на TreeMap
и поместить все в окно, чтобы слияние происходило в конце окна после получения 3 TreeMaps
. Имеет ли смысл этот другой вариант в случае, если вариант a нежизнеспособен или существует лучшее решение, чтобы сделать что-то подобное?