События обрабатываются и объединяются, как только они получены, если они находятся в пределах окна задержки (10 минут для события A и 30 минут для события B). Конечная задержка не имеет минимальных ограничений из-за какой-либо конфигурации Flink.
Это верно. События будут отображаться и фильтроваться по мере их поступления и помещаться в буфер для удовлетворения требований окна объединения.
Таблица будет удерживать события в течение максимум 30 минут до появления водяных знаков прибыть из обоих потоков. Позже, основываясь на водяных знаках, события очищаются
Это верно. IntervalJoinOperator
будет получать события как с правой, так и с левой стороны соединения, проверять, находятся ли они в пределах времени, и, если да, отправлять их вниз по течению:
private <THIS, OTHER> void processElement(
final StreamRecord<THIS> record,
final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
final long relativeLowerBound,
final long relativeUpperBound,
final boolean isLeft) throws Exception {
final THIS ourValue = record.getValue();
final long ourTimestamp = record.getTimestamp();
if (ourTimestamp == Long.MIN_VALUE) {
throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
"interval stream joins need to have timestamps meaningful timestamps.");
}
if (isLate(ourTimestamp)) {
return;
}
addToBuffer(ourBuffer, ourValue, ourTimestamp);
for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
final long timestamp = bucket.getKey();
if (timestamp < ourTimestamp + relativeLowerBound ||
timestamp > ourTimestamp + relativeUpperBound) {
continue;
}
for (BufferEntry<OTHER> entry: bucket.getValue()) {
if (isLeft) {
collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
} else {
collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
}
}
}
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
if (isLeft) {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
} else {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
}
}
Конфигурация запроса в приведенный ниже код является избыточным и не требуется
Это верно. withIdleStateRetentionTime
имеет значение, когда вы используете неограниченные операторы, такие как предложение GROUP BY
в SQL без атрибутов windows.