Понимание задержки и водяных знаков во временном окне Флинка - PullRequest
1 голос
/ 18 марта 2020

Я пытаюсь объединить два типа (скажем, события A и B) событий во Flink. Я хочу подтвердить правильность моего понимания. Некоторые из свойств событий -

  1. Событие A мгновенно мигает с задержкой в ​​несколько минут (5-10 минут)
  2. Событие B протекает с небольшой задержкой 15-30 минут
  3. Существует соединение 1: 1 между событием A и событием B

Я настроил поток данных события A с BoundedOutOfOrdernessTimestampExtractor, равным 10 минутам, и поток данных события B с 30 минутами. Позже я присоединяюсь к временному окну, используя Table API.

Правильно ли мое понимание относительно следующего -

  1. События обрабатываются и объединяются, как только они получены, если они находятся в пределах окна задержки (10 минут для события A и 30 минут для события B). Сквозная задержка не имеет минимальных ограничений из-за какой-либо конфигурации Flink.
  2. Таблица будет удерживать события максимум 30 минут, пока водяные знаки не поступят из обоих потоков. Позже, основываясь на водяных знаках, события очищаются
  3. Конфигурация запроса в приведенном ниже коде избыточна и не требуется

Любые другие предложения по поводу кода ниже?

queryConfig.withIdleStateRetentionTime(
    org.apache.flink.api.common.time.Time.seconds(1),
    org.apache.flink.api.common.time.Time.minutes(30))

val stream: DataStream[Any] = textStream.flatMap(json => convert(json))

val aStream: DataStream[ClassA] =
    stream
        .filter(obj => obj.isInstanceOf[ClassA])
        .rebalance
        .map(obj => obj.asInstanceOf[ClassA])
        .assignTimestampsAndWatermarks(
            new BoundedOutOfOrdernessTimestampExtractor[ClassA](
                Time.minutes(10)){
                override def extractTimestamp(element: ClassA): Long =
                    element.serviceTimestamp.toInstant.toEpochMilli
            })

val bStream: DataStream[ClassB] =
    stream
            .filter(obj => obj.isInstanceOf[ClassB])
            .rebalance
            .map(obj => obj.asInstanceOf[ClassB])
            .assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor[ClassB](
                    Time.minutes(30)){
                    override def extractTimestamp(element: ClassB): Long =
                        element.timestamp.toInstant.toEpochMilli
                })

val aTable: Table  = tableEnv.fromDataStream[ClassA](aStream,
    // The .rowtime is for setting event time attributes
    'aTimestamp.rowtime as 'aTimestamp, 'aUniqueId, 'aItem)

val bTable: Table  = tableEnv.fromDataStream[ClassB](bStream,
    // The .rowtime is for setting event time attributes
    // https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
    'bTimestamp.rowtime as 'bTimestamp, 'uniqueId, 'bItem)

val result: Table = aTable
        .join(aTable)
        .where('aUniqueId === 'uniqueId
                // Give ClassB events 30 minutes lateness.
                // Use a time window join as optimization - https://stackoverflow.com/a/51620821
                // & https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#time-windowed-joins
                // Both time clauses are need to qualify as time window join
                && 'bTimestamp >= 'aTimestamp
                && 'bTimestamp <= 'aTimestamp + 30.minutes)
        // DO NOT change order without changing order in later parsing code
        .select('uniqueId, 'aItem, 'bItem, 'bTimestamp, 'aTimestamp.cast(createTypeInformation[Timestamp]))

val outputStream: DataStream[ClassC]  = tableEnv
                .toAppendStream[(String, String, String, Timestamp, Timestamp)](result)
                // TODO find better way to map to a POJO
                .map(row => ClassCUtils.toClassC(row))

1 Ответ

0 голосов
/ 18 марта 2020

События обрабатываются и объединяются, как только они получены, если они находятся в пределах окна задержки (10 минут для события A и 30 минут для события B). Конечная задержка не имеет минимальных ограничений из-за какой-либо конфигурации Flink.

Это верно. События будут отображаться и фильтроваться по мере их поступления и помещаться в буфер для удовлетворения требований окна объединения.

Таблица будет удерживать события в течение максимум 30 минут до появления водяных знаков прибыть из обоих потоков. Позже, основываясь на водяных знаках, события очищаются

Это верно. IntervalJoinOperator будет получать события как с правой, так и с левой стороны соединения, проверять, находятся ли они в пределах времени, и, если да, отправлять их вниз по течению:

private <THIS, OTHER> void processElement(
            final StreamRecord<THIS> record,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
            final long relativeLowerBound,
            final long relativeUpperBound,
            final boolean isLeft) throws Exception {

        final THIS ourValue = record.getValue();
        final long ourTimestamp = record.getTimestamp();

        if (ourTimestamp == Long.MIN_VALUE) {
            throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
                    "interval stream joins need to have timestamps meaningful timestamps.");
        }

        if (isLate(ourTimestamp)) {
            return;
        }

        addToBuffer(ourBuffer, ourValue, ourTimestamp);

        for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
            final long timestamp  = bucket.getKey();

            if (timestamp < ourTimestamp + relativeLowerBound ||
                    timestamp > ourTimestamp + relativeUpperBound) {
                continue;
            }

            for (BufferEntry<OTHER> entry: bucket.getValue()) {
                if (isLeft) {
                    collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
                } else {
                    collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
                }
            }
        }

        long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
        if (isLeft) {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
        } else {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
        }
    }

Конфигурация запроса в приведенный ниже код является избыточным и не требуется

Это верно. withIdleStateRetentionTime имеет значение, когда вы используете неограниченные операторы, такие как предложение GROUP BY в SQL без атрибутов windows.

...