Объединение перебалансированного раздела - PullRequest
0 голосов
/ 01 июля 2019

В качестве одного из моих последних шагов в потоковом приложении я хочу отсортировать неупорядоченные события в системе. Для этого я использовал:

events.keyBy((Event event) -> event.id)
                .process(new SortFunction())
                .print();

Где sort Функция:

public static class SortFunction extends KeyedProcessFunction<String, Event, Event> {
        private ValueState<PriorityQueue<Event>> queueState = null;

        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<PriorityQueue<Event>> descriptor = new ValueStateDescriptor<>(
                    // state name
                    "sorted-events",
                    // type information of state
                    TypeInformation.of(new TypeHint<PriorityQueue<Event>>() {
                    }));
            queueState = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
            TimerService timerService = context.timerService();

            if (context.timestamp() > timerService.currentWatermark()) {
                PriorityQueue<Event> queue = queueState.value();
                if (queue == null) {
                    queue = new PriorityQueue<>(10);
                }
                queue.add(event);
                queueState.update(queue);
                timerService.registerEventTimeTimer(event.timestamp);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) throws Exception {
            PriorityQueue<Event> queue = queueState.value();
            Long watermark = context.timerService().currentWatermark();
            Event head = queue.peek();
            while (head != null && head.timestamp <= watermark) {
                out.collect(head);
                queue.remove(head);
                head = queue.peek();
            }
        }
    }

То, что я сейчас пытаюсь сделать, это попытаться парализовать это. Моя текущая идея - сделать следующее:

    events.keyBy((Event event) -> event.id)
                    .rebalance()
                    .process(new SortFunction()).setParalelism(3)
                    .map(new KWayMerge()).setParalelism(1).
                    .print();

Если то, что я понимаю, правильно, что должно произойти в этом случае, и исправьте меня, если я ошибаюсь, это то, что раздел каждого из Событий для данного ключа (в идеале 1/3) будет идти к каждому из параллельные экземпляры SortFunction, в этом случае для полной сортировки мне нужно создать map или другой processFunction, который получает отсортированные события из 3 различных экземпляров и объединяет их вместе.

Если это так, есть ли способ отличить происхождение события, полученного map, чтобы я мог выполнить трехстороннее объединение на map? Если это невозможно, моей следующей идеей будет поменять PriorityQueue на TreeMap и поместить все в окно, чтобы слияние происходило в конце окна после получения 3 TreeMaps. Имеет ли смысл этот другой вариант в случае, если вариант a нежизнеспособен или существует лучшее решение, чтобы сделать что-то подобное?

1 Ответ

0 голосов
/ 01 июля 2019

Прежде всего, вы должны знать, что использование PriorityQueue или TreeMap в Flink ValueState - хорошая идея, если и только если вы используете бэкэнд состояния на основе кучи. В случае RocksDB это будет работать довольно плохо, поскольку PriorityQueues будет десериализоваться при каждом доступе и повторно сериализироваться при каждом обновлении. В общем, мы рекомендуем сортировку на основе MapState, и именно так сортировка реализована в библиотеках Флинка.

Что будет делать этот код

events.keyBy((Event event) -> event.id)
            .process(new SortFunction())

- это независимая сортировка потока по ключам - выходные данные будут отсортированы по каждому ключу, но не глобально.

С другой стороны, это

events.keyBy((Event event) -> event.id)
                .rebalance()
                .process(new SortFunction()).setParalelism(3)

не будет работать, потому что результат перебалансировки больше не является KeyedStream, а функция SortFunction зависит от состояния ключа.

Более того, я не верю, что выполнение 3-х видов 1/3 потока и последующее слияние результатов будет заметно лучше, чем единственная глобальная сортировка. Если вам нужно выполнить глобальную сортировку, вы можете рассмотреть возможность использования Table API вместо этого. См. ответ здесь для примера.

...