Я написал пользовательскую статистическую функцию, для которой я вижу следующее поведение. Запрос, подобный следующему, отлично работает,
SELECT
my_aggregate_function(column)
FROM table
Принимая во внимание:
SELECT
a,
my_aggregate_function(column)
FROM table
GROUP BY
1
бросает java.lang.reflect.InvocationTargetException
. Я ожидаю, что я, должно быть, что-то напутал, когда писал свою функцию агрегирования. Эта функция является реализацией алгоритма гиперлогога, но изменена таким образом, что она получает данные из формата в кодировке base64, который регистрируется в другой системе:
@AggregationFunction("hll_merge")
public final class HyperLogLogAggregation
{
private HyperLogLogAggregation()
{
}
// This function tells us how to process rows into the accumulator state
@InputFunction
public static void input(@AggregationState HyperLogLogState state, @SqlType(StandardTypes.VARCHAR) Slice registers)
{
byte[] decodedBytes = Base64.getDecoder().decode(registers.getBytes());
DynamicSliceOutput bytesToDeserialize = new DynamicSliceOutput(decodedBytes.length)
.appendBytes(decodedBytes);
merge(state, HyperLogLog.newInstance(bytesToDeserialize.slice()));
}
// This function tells us how to combine accumulators
@CombineFunction
public static void combine(@AggregationState HyperLogLogState state, @AggregationState HyperLogLogState otherState)
{
merge(state, otherState.getHyperLogLog());
}
// This function takes our state and merges in a new HLL.
private static void merge(@AggregationState HyperLogLogState state, HyperLogLog input)
{
HyperLogLog previous = state.getHyperLogLog();
if (previous == null) {
state.setHyperLogLog(input);
state.addMemoryUsage(input.estimatedMemorySize());
}
else {
state.addMemoryUsage(-previous.estimatedMemorySize());
previous.mergeWith(input);
state.addMemoryUsage(previous.estimatedMemorySize());
}
}
@OutputFunction(StandardTypes.VARCHAR)
public static void output(@AggregationState HyperLogLogState state, BlockBuilder out)
{
HyperLogLog hll = state.getHyperLogLog();
if (hll == null) {
out.appendNull();
}
else {
Slice output = hll.serialize();
String encodedData = Base64.getEncoder().encodeToString(output.getBytes());
DynamicSliceOutput outSlice = new DynamicSliceOutput(encodedData.length());
outSlice.appendBytes(encodedData.getBytes());
VARCHAR.writeSlice(out, outSlice.slice());
}
}
}
Я хотел бы услышать любые идеи относительно того, в чем может быть проблема, поскольку я полностью озадачен.