как сгенерировать результат обработки события без задержки во флинке - PullRequest
1 голос
/ 26 июня 2019

Мы рассматриваем возможность использования flink для использования, но не уверены, подходит ли ему flink. Вот мой случай использования. Когда приходит событие e1, нам нужно обработать его и выдать результат. Источник и приемник не имеют отношения к этому обсуждению, но вы можете рассматривать службу очереди сообщений как источник и приемник. Вся обработка события не зависит от других событий. Поэтому при обработке события e1 нам не нужно e2 или любое другое событие. В рамках обработки нам нужно выполнить шаги 1, 2, 3, 4, как показано на диаграмме ниже. Обратите внимание, что шаг 2 и шаг 3 должны выполняться параллельно.

enter image description here

Задержка обработки события очень важна для нас. Поэтому мне нужно выдать результат, как только обработка для этого элемента будет завершена, вместо ожидания некоторого времени ожидания окна. С моими ограниченными знаниями в Flink я мог думать только о следующем подходе

DataStream<Map<String, Object>> step1 = env.addSource(...);
DataStream<Map<String, Object>> step2 = step1.map(...);
DataStream<Map<String, Object>> step3 = step1.map(...);

Теперь, как мне объединить результаты шагов 2 и 3 и выдать результат? В этом простом примере у меня есть только две пары для слияния, но их также может быть больше 2. Я мог бы сделать объединение потоков. У меня может быть уникальный идентификатор события для группировки выходов промежуточных шагов, связанных с конкретным событием.

DataStream<Map<String, Object>> mergedStream = step1.union(step2).keyBy(...);

Но как получить результат? В идеале я хотел бы сказать «испустить результат, как только я получу вывод с шага 2 и шага 3 для определенного ключа» вместо «испускать результат каждые 30 миллисекунд». У последнего есть две проблемы: он может генерировать частичные результаты и имеет задержку. Есть ли способ указать первое? Я изучаю Flink, но я готов рассмотреть другие альтернативы, если это решит мой вариант использования.

1 Ответ

2 голосов
/ 27 июня 2019

На шаге 1 добавьте идентификатор события. Затем, после объединения, введите поток с идентификатором события и используйте RichFlatMapFunction, чтобы объединить результаты шагов 2 и 3 обратно в одно событие. Если шаги 2 и 3 генерируют события типа EnrichedEvent, тогда шаг 4 может быть:

static class FanIn extends RichFlatMapFunction<EnrichedEvent, EnrichedEvent> {
    private transient ValueState<EnrichedEvent> enrichmentResponseState;

    @Override
    public void flatMap(EnrichedEvent value, Collector<EnrichedEvent> out) throws Exception {
        EnrichedEvent response = enrichmentResponseState.value();

        if (response != null) {
            response = response.combine(value);
        } else {
            response = value;
        }

        if (response.isComplete()) {
            out.collect(response);
            enrichmentResponseState.clear();
        } else {
            enrichmentResponseState.update(response);
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<EnrichedEvent> fanInStateDescriptor =
            new ValueStateDescriptor<>( "enrichmentResponse",
                TypeInformation.of(new TypeHint<EnrichedEvent>() {})
            );

        enrichmentResponseState = getRuntimeContext().getState(fanInStateDescriptor);
    }
}

После этого просто отправить окончательный результат в приемник.

...