Функция агрегирования Presto UDF генерирует исключение java.lang.reflect.InvocationTargetException при вызове с непустым набором GROUP BY, но в противном случае это нормально. - PullRequest
0 голосов
/ 15 марта 2019

Я написал пользовательскую статистическую функцию, для которой я вижу следующее поведение. Запрос, подобный следующему, отлично работает,

SELECT
    my_aggregate_function(column)
FROM table

Принимая во внимание:

SELECT
    a,
    my_aggregate_function(column)
FROM table
GROUP BY
    1

бросает java.lang.reflect.InvocationTargetException. Я ожидаю, что я, должно быть, что-то напутал, когда писал свою функцию агрегирования. Эта функция является реализацией алгоритма гиперлогога, но изменена таким образом, что она получает данные из формата в кодировке base64, который регистрируется в другой системе:

@AggregationFunction("hll_merge")
public final class HyperLogLogAggregation
{
    private HyperLogLogAggregation()
    {
    }

    // This function tells us how to process rows into the accumulator state
    @InputFunction
    public static void input(@AggregationState HyperLogLogState state, @SqlType(StandardTypes.VARCHAR) Slice registers)
    {
        byte[] decodedBytes = Base64.getDecoder().decode(registers.getBytes());
        DynamicSliceOutput bytesToDeserialize = new DynamicSliceOutput(decodedBytes.length)
            .appendBytes(decodedBytes);
        merge(state, HyperLogLog.newInstance(bytesToDeserialize.slice()));
    }

    // This function tells us how to combine accumulators
    @CombineFunction
    public static void combine(@AggregationState HyperLogLogState state, @AggregationState HyperLogLogState otherState)
    {
        merge(state, otherState.getHyperLogLog());
    }

    // This function takes our state and merges in a new HLL.
    private static void merge(@AggregationState HyperLogLogState state, HyperLogLog input)
    {
        HyperLogLog previous = state.getHyperLogLog();
        if (previous == null) {
            state.setHyperLogLog(input);
            state.addMemoryUsage(input.estimatedMemorySize());
        }
        else {
            state.addMemoryUsage(-previous.estimatedMemorySize());
            previous.mergeWith(input);
            state.addMemoryUsage(previous.estimatedMemorySize());
        }
    }

    @OutputFunction(StandardTypes.VARCHAR)
    public static void output(@AggregationState HyperLogLogState state, BlockBuilder out)
    {
        HyperLogLog hll = state.getHyperLogLog();
        if (hll == null) {
            out.appendNull();
        }
        else {
            Slice output = hll.serialize();
            String encodedData = Base64.getEncoder().encodeToString(output.getBytes());
            DynamicSliceOutput outSlice = new DynamicSliceOutput(encodedData.length());
            outSlice.appendBytes(encodedData.getBytes());
            VARCHAR.writeSlice(out, outSlice.slice());
        }
    }
}

Я хотел бы услышать любые идеи относительно того, в чем может быть проблема, поскольку я полностью озадачен.

...