Похоже, UDTF - правильное решение для вас. Вы можете увидеть взорвать в качестве примера UDTF, который принимает массив, а затем выводит N строк, по одной для каждого элемента.
Подпись для вашей UDTF будет аналогичной:
@Udtf(schema = "STRUCT<key VARCHAR, value VARCHAR>")
public <T> List<Struct> expandMapEntries(final Map<String, String> input) {
// output a list of key value pairs as a struct from 'input'
}
Затем вы можете использовать этот UDTF и выбрать из него поля (что-то вроде ниже):
CREATE STREAM expanded AS SELECT EXPAND_MAP_ENTRIES(KeyValues) AS keyVals FROM source;
CREATE STREAM flattened AS keyVals->key as `KEY`, keyVals->value AS VALUE FROM expanded;
Дайте мне знать, если это работает для вас, и не стесняйтесь обращаться к слабость сообщества (@almog) - мне очень интересен этот вариант использования.