Передать всю строку в качестве параметра пользовательской табличной функции в Flink Table API - PullRequest
0 голосов
/ 17 апреля 2020

Как я могу передать всю Row моей ScalarFunction RowToTupleConverter в следующем коде? Во всех примерах рассматривается только передача одного или нескольких значений по имени, но я хочу, чтобы весь результат инструкции SELECT передавался как Row. Я предположил, что использовал *, но это не распознается как допустимый параметр.

envT.registerFunction("toTuple", new RowToTupleConverter());
envT.createTemporaryView("t", envT.fromDataStream(ds));                     
Table result = envT.from("t").select("getAvroFieldString(f1, 'HASH_KEY') as hk,
               getAvroFieldLong(f1, 'LOAD_DATE') as ld, 'test' as NAME");
envT.toAppendStream(result.select("*").map("toTuple(*)"), new TupleTypeInfo[...]).print();

Я не хочу обращаться к отдельным полям, но к целой строке, так как я создаю все generi c, таким образом, моей ScalarFunction требуется параметр типа Row. Функция выполняет итерацию по строке и создает Tuple2<GenericRecord,GenericRecord>> из значений строки.

Фон:

Задание создается так, потому что нам нужно ключ и значение из источника Kafka, использующего реестр Confluent Schema, и задание должно быть обобщенным c, чтобы разрешить произвольную схему, разрешающую множественные экземпляры без изменения кодовой базы. Единственный способ добиться этого - создать DataStream из FlinkKafkaConsumer, где Tuple2 включает ключ и значение сообщения каждый в экземпляре GenericRecord, и преобразовать его в таблицу Flink. Поскольку GenericRecord является черным ящиком для API таблицы, я следовал рекомендациям в другом потоке и создал простые функции ScalarFunctions, которые извлекают нужные мне значения c. Прямо сейчас эта часть все еще жестко закодирована, но как только все заработает, она также будет обобщенной c. Однако я изо всех сил пытаюсь обернуть таблицу результатов обратно в Tuple2, чтобы записать преобразованные записи обратно в другую Kafka Topi c, поэтому я ввел еще одну функцию ScalarFunction для отображения из строки в Tuple2<GenericRecord,GenericRecord>>.

Возможно ли это, и если да, то как? Если нет, то какой обходной путь можно использовать для решения этой проблемы? Я также был бы признателен за предложения по более элегантному способу в целом, но, судя по количеству исследований, которые я провел в этом направлении, и из-за характера варианта использования, я сомневаюсь, что так и есть. К сожалению, переход на SpecificRecord не возможен.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...