Я хотел бы определить функцию MAX_BY
, которая принимает значение типа T
и параметр упорядочения типа Number
и возвращает максимальный элемент из окна в соответствии с порядком (типа T
). Я пробовал
public class MaxBy<T> extends AggregateFunction<T, Tuple2<T, Number>> {
@Override
public T getValue(Tuple2<T, Number> tuple) {
return tuple.f0;
}
@Override
public Tuple2<T, Number> createAccumulator() {
return Tuple2.of(null, 0L);
}
public void accumulate(Tuple2<T, Number> acc, T value, Number order) {
if (order.doubleValue() > acc.f1.doubleValue()) {
acc.f0 = value;
acc.f1 = order;
}
}
}
, но не могу зарегистрировать такую функцию с помощью TableEnvironment.registerFunction
. Ниже Flink использует TypeInformation
для сопоставления типов в запросе SQL, и с таким определением он не может определять типы (по крайней мере, я так полагаю). Я видел, что можно предоставить несколько accumulate
функций, но все же - я думаю, что тип возвращаемого значения должен быть одинаковым для каждого перегруженного метода.
Встроенные функции агрегирования работают аналогично тому, что я хочу достичь - MAX
может принимать произвольный тип столбца и возвращать тот же тип. Поэтому, полагаю, я тоже смогу это сделать.