Flink: преобразовать убирающийся SQL в добавляющий SQL, используя только SQL, для подачи во временную таблицу - PullRequest
0 голосов
/ 23 сентября 2019

Я предоставляю пользователям интерфейс Flink SQL, поэтому я не могу использовать интерфейс Table или Java / Scala.Все должно быть указано в SQL.Однако я могу анализировать комментарии в файлах SQL и добавлять указанные специальные инструкции API нижнего уровня.

Как может конвертировать один пользователь, скажем:

SELECT b, AVG(a) "average" FROM source_data GROUP BY b

name: average_source_data_retracting
  b STRING
  average NUMERIC

- что убирает значения-в форме, которая добавит их.Эта добавляющая форма может иметь следующую схему:

name: average_source_data_appending
  flag BOOLEAN <-- indicating an accumulate or retract message
  b STRING
  average NUMERIC

В некотором роде RetractStreamTableSink эквивалентен AppendStreamTableSink, но без использования приемника.

Все эточтобы разрешить использование average_source_data_appending для создания Временная таблица (фильтрация возвращаемых сообщений), но этот тип таблицы принимает только исходные таблицы только для добавления.

Я рассмотрел использование окон ( как говорилось здесь ), но я бы хотел, чтобы обновления во временной таблице были мгновенными.

1 Ответ

0 голосов
/ 23 сентября 2019

Пожалуйста, не обращайте внимания на этот вопрос, очевидно, что функции временных таблиц могут принимать таблицы, которые (для меня) убираются.

Что-то с эффектом:

SELECT b, AVG(a) "average", MAX(proctime) max_proctime FROM source_data GROUP BY b

Может быть принято как временноеТаблица Функция с b в качестве ключа и max_proctime в качестве атрибута времени.Я полагаю, что MAX (proctime) как-то заставляет думать, что появляются новые строки, когда они только перезаписываются?Я думаю, что мне нужно больше времени, чтобы понять это.

РЕДАКТИРОВАТЬ:

Копаясь в исходном коде, мы обнаруживаем, что функции временной таблицы, кажется, принимают отводимые определения, но только если это время обработки:

Temporal ProcessTime JoinOperator.java :

@Override
public void processElement2(StreamRecord<BaseRow> element) throws Exception {
    if (BaseRowUtil.isAccumulateMsg(element.getValue())) {
        rightState.update(element.getValue());
        registerProcessingCleanupTimer();
    } else {
        rightState.clear();
        cleanupLastTimer();
    }
}

Temporal RowTime JoinOperator.java :

@Override
public void processElement2(StreamRecord<BaseRow> element) throws Exception {
    ...
    checkNotRetraction(row);
    ...
}
private void checkNotRetraction(BaseRow row) {
    if (BaseRowUtil.isRetractMsg(row)) {
        String className = getClass().getSimpleName();
        throw new IllegalStateException(
            "Retractions are not supported by " + className +
                ". If this can happen it should be validated during planning!");
    }
}

Это без документов;Я не знаю, является ли это постоянным, и будет ли документация обновлена.

...