Создать Flink SQL UDF с общим типом возврата c - PullRequest
2 голосов
/ 26 мая 2020

Я хотел бы определить функцию MAX_BY, которая принимает значение типа T и параметр упорядочения типа Number и возвращает максимальный элемент из окна в соответствии с порядком (типа T). Я пробовал

public class MaxBy<T> extends AggregateFunction<T, Tuple2<T, Number>> {

    @Override
    public T getValue(Tuple2<T, Number> tuple) {
        return tuple.f0;
    }

    @Override
    public Tuple2<T, Number> createAccumulator() {
        return Tuple2.of(null, 0L);
    }

    public void accumulate(Tuple2<T, Number> acc, T value, Number order) {
        if (order.doubleValue() > acc.f1.doubleValue()) {
            acc.f0 = value;
            acc.f1 = order;
        }
    }
}

, но не могу зарегистрировать такую ​​функцию с помощью TableEnvironment.registerFunction. Ниже Flink использует TypeInformation для сопоставления типов в запросе SQL, и с таким определением он не может определять типы (по крайней мере, я так полагаю). Я видел, что можно предоставить несколько accumulate функций, но все же - я думаю, что тип возвращаемого значения должен быть одинаковым для каждого перегруженного метода.

Встроенные функции агрегирования работают аналогично тому, что я хочу достичь - MAX может принимать произвольный тип столбца и возвращать тот же тип. Поэтому, полагаю, я тоже смогу это сделать.

1 Ответ

1 голос
/ 29 мая 2020

К сожалению, Flink не поддерживает функции агрегирования с гибкими типами возврата. Для функции MAX внутренняя реализация определяет основные logi c независимо от типа, а затем создает реализацию для каждого поддерживаемого типа ( см. Код ).

Внутренне, Затем MAX отображается в правильную реализацию, в зависимости от типа.

Я не думаю, что это возможно, если вы определяете и регистрируете функцию как определяемую пользователем функцию агрегирования.

...