Поток фильтра Flink на основе детерминированного другого потока - PullRequest
0 голосов
/ 17 июня 2020

У меня есть 2 потока данных во Flink (с общими временными метками и от Kafka), один из которых содержит некоторые значения сигналов, а другой - информацию об активности (простой активный-неактивный). Я пробовал RichCoProcessFunction с простым состоянием private ValueState<Boolean> seen;, и результаты недетерминированы c. Если я использую один и тот же набор данных (с одинаковыми отметками времени), используя startFromEarliest, я иногда получаю разные фильтруемые значения. Как я могу сделать его детерминированным c? Я делюсь своим KeyedCoProcessFunction скелетом ниже.

private ValueState < Boolean > seen;

@Override
public void open(Configuration parameters) throws Exception {
    ValueStateDescriptor < Boolean > descriptor = new ValueStateDescriptor < > (
        // state name
        "have-seen-key",
        // type information of state
        TypeInformation.of(new TypeHint < Boolean > () {}));
    seen = getRuntimeContext().getState(descriptor);
}

@Override
public void processElement1(SomeEvent < Double > value, Context ctx, Collector < SomeEvent < Double >> out) throws Exception {
    if (seen.value() == Boolean.TRUE) {
        out.collect(value);
    }
}

@Override
public void processElement2(SomeEvent < Double > value, Context ctx, Collector < SomeEvent < Double >> out) throws Exception {
    if (value.value == 1) {
        seen.update(Boolean.TRUE);

    } else {
        seen.update(Boolean.FALSE);
    }

}

Ответы [ 2 ]

0 голосов
/ 17 июня 2020

Реализация соединения по времени события нужного вам типа может быть выполнена как RichCoProcessFunction, но это может быть немного сложно. Вы можете предпочесть реализовать это как соединение с функцией темпоральной таблицы .

0 голосов
/ 17 июня 2020

Причина того, что он не детерминирован c, состоит в том, что два источника производят элементы с разной скоростью. Самый простой способ сделать его более детерминированным c - использовать EventTime. Это означает, что вам потребуется назначить временные метки как для контрольных записей, так и для записей данных. Затем Flink будет генерировать водяные знаки для ваших элементов.

Затем вы можете просто буферизовать и подождать с испусканием или отбрасыванием элементов, пока вы не получите водяной знак для потока управления, что означает, что в потоке управления ничего не изменится.

Без временных меток практически невозможно ввести детерминированное c поведение в таком случае, потому что вы никогда не сможете точно сказать, когда данная запись прибыла и какие записи следует удалить, а какие следует испущен.

...