Как Flink обрабатывает метки времени внутри итерационных циклов? - PullRequest
0 голосов
/ 08 июня 2019

Как обрабатываются временные метки в итеративном цикле DataStream в Flink?

Например, вот пример простого итеративного цикла в Flink, в котором цикл обратной связи отличается от входного потока:

DataStream<MyInput> inputStream = env.addSource(new MyInputSourceFunction());
IterativeStream.ConnectedIterativeStreams<MyInput, MyFeedback> iterativeStream = inputStream.iterate().withFeedbackType(MyFeedback.class);
// define an output tag so we can emit feedback objects via a side output
final OutputTag<MyFeedback> outputTag = new OutputTag<MyFeedback>("feedback-output"){};
// now do some processing
SingleOutputStreamOperator<MyOutput> combinedStreams = iterativeStream.process(new CoProcessFunction<MyInput, MyFeedback, MyOutput>() {
    @Override
    public void processElement1(MyInput value, Context ctx, Collector<MyOutput> out) throws Exception {
        // do some processing of the stream of MyInput values
        // emit MyOutput values downstream by calling out.collect()
        out.collect(someInstanceOfMyOutput);
    }

    @Override
    public void processElement2(MyFeedback value, Context ctx, Collector<MyOutput> out) throws Exception {
        // do some more processing on the feedback classes
        // emit feedback items
        ctx.output(outputTag, someInstanceOfMyFeedback);
    }
});

iterativeStream.closeWith(combinedStreams.getSideOutput(outputTag));

Мои вопросы вращаются вокруг того, как Flink использует временные метки в цикле обратной связи:

  • В пределах ConnectedIterativeStreams, как Flink обрабатывает упорядочение входных объектов в потокахрегулярные входы и объекты обратной связи?Если я отправлю объект в цикл обратной связи, когда его увидит глава цикла относительно обычного потока входных объектов?
  • Как изменяется поведение при использовании обработки времени события?

1 Ответ

0 голосов
/ 10 июня 2019

AFAICT, Flink не дает никаких гарантий по упорядочению входных объектов. Я столкнулся с этим, когда пытался использовать итерации для алгоритма кластеризации во Flink, где обновления центроидов не обрабатываются своевременно. Единственное решение, которое я нашел, состояло в том, чтобы по существу создать единый (объединенный) поток входящих событий и обновлений центроида по сравнению с использованием совместного потока.

К вашему сведению это предложение для устранения некоторых недостатков итераций.

...