Потребление из двух flink dataStream на основе приоритета или циклического перебора - PullRequest
0 голосов
/ 15 января 2020

у меня два флинка dataStream. Например: dataStream1 и dataStream2. Я хочу объединить оба потока в один поток, чтобы я мог обрабатывать их, используя одни и те же функции процесса, поскольку dag обоих dataStream одинаков.

На данный момент мне нужен одинаковый приоритет потребления сообщений для любого потока. Производитель dataStream2 создает 10 сообщений в минуту, а производитель dataStream1 - 1000 сообщений в секунду. Кроме того, dataTypes одинаковы для обоих dataStreams.DataSteam2 более очереди с высоким приоритетом, которые должны использоваться как можно скорее. Нет связи между сообщениями dataStream1 и dataStream2

Будет ли dataStream1.union(dataStream2) генерировать поток, который будет иметь элементы обоих потоков?

Ответы [ 3 ]

1 голос
/ 15 января 2020

Возможно, самым простым решением этой проблемы, но не самым эффективным, в зависимости от точной спецификации источников ваших данных, может быть соединение двух потоков. В этом решении вы можете использовать CoProcessFunction, который будет вызывать отдельные методы для каждого из подключенных потоков.

В этом решении вы можете просто буферизовать элементы одного потока до тех пор, пока они не будут созданы (например, циклически). Но имейте в виду, что это может быть весьма неэффективно, если существует очень большая разница между частотой, с которой источники генерируют события.

0 голосов
/ 15 января 2020

Возможно, вы захотите использовать пользовательский оператор, который реализует интерфейс InputSelectable, чтобы уменьшить объем необходимой буферизации. Ниже приведен пример, который реализует чередование без какой-либо буферизации, но обязательно прочитайте предостережение в документах , которое объясняет, что

... оператор может получить некоторые данные, которые он в данный момент не хочет обрабатывать ...

Другими словами, нельзя полагаться на этот простой пример, чтобы он действительно работал как есть.

public class Alternate {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Long> positive = env.generateSequence(1L, 100L);
        DataStream<Long> negative = env.generateSequence(-100L, -1L);

        AlternatingTwoInputStreamOperator op = new AlternatingTwoInputStreamOperator();

        positive
            .connect(negative)
            .transform("Hack that needs buffering", Types.LONG, op)
            .print();

        env.execute();
    }
}

class AlternatingTwoInputStreamOperator extends AbstractStreamOperator<Long>
        implements TwoInputStreamOperator<Long, Long, Long>, InputSelectable {

    private InputSelection nextSelection = InputSelection.FIRST;

    @Override
    public void processElement1(StreamRecord<Long> element) throws Exception {
        output.collect(element);
        nextSelection = InputSelection.SECOND;
    }

    @Override
    public void processElement2(StreamRecord<Long> element) throws Exception {
        output.collect(element);
        nextSelection = InputSelection.FIRST;
    }

    @Override
    public InputSelection nextSelection() {
        return this.nextSelection;
    }
}

Примечание также, что InputSelectable был добавлен во Flink 1.9.0.

0 голосов
/ 15 января 2020

Похоже, что два DataStream имеют разные типы элементов, хотя вы не указали это явно. Если это так, то создайте Either<stream1 type, stream2 type> через MapFunction для каждого потока, а затем union() для двух потоков. Вы не получите точного смешения двух, поскольку Flink будет чередовать потребление из сетевого буфера каждого потока.

Если вы действительно хотите хорошо смешанные потоки, то (как отметили другие) вам нужно будет буферизовать входящие элементы через состояние, а также применяют некоторую эвристику, чтобы избежать чрезмерной буферизации, если по какой-либо причине (например, из-за различий в задержке сети или, более вероятно, из-за разной производительности между двумя источниками) у вас очень разные скорости передачи данных между двумя потоками.

...