Как я могу передать всю Row
моей ScalarFunction RowToTupleConverter
в следующем коде? Во всех примерах рассматривается только передача одного или нескольких значений по имени, но я хочу, чтобы весь результат инструкции SELECT передавался как Row
. Я предположил, что использовал *, но это не распознается как допустимый параметр.
envT.registerFunction("toTuple", new RowToTupleConverter());
envT.createTemporaryView("t", envT.fromDataStream(ds));
Table result = envT.from("t").select("getAvroFieldString(f1, 'HASH_KEY') as hk,
getAvroFieldLong(f1, 'LOAD_DATE') as ld, 'test' as NAME");
envT.toAppendStream(result.select("*").map("toTuple(*)"), new TupleTypeInfo[...]).print();
Я не хочу обращаться к отдельным полям, но к целой строке, так как я создаю все generi c, таким образом, моей ScalarFunction требуется параметр типа Row
. Функция выполняет итерацию по строке и создает Tuple2<GenericRecord,GenericRecord>>
из значений строки.
Фон:
Задание создается так, потому что нам нужно ключ и значение из источника Kafka, использующего реестр Confluent Schema, и задание должно быть обобщенным c, чтобы разрешить произвольную схему, разрешающую множественные экземпляры без изменения кодовой базы. Единственный способ добиться этого - создать DataStream из FlinkKafkaConsumer
, где Tuple2
включает ключ и значение сообщения каждый в экземпляре GenericRecord
, и преобразовать его в таблицу Flink. Поскольку GenericRecord
является черным ящиком для API таблицы, я следовал рекомендациям в другом потоке и создал простые функции ScalarFunctions, которые извлекают нужные мне значения c. Прямо сейчас эта часть все еще жестко закодирована, но как только все заработает, она также будет обобщенной c. Однако я изо всех сил пытаюсь обернуть таблицу результатов обратно в Tuple2
, чтобы записать преобразованные записи обратно в другую Kafka Topi c, поэтому я ввел еще одну функцию ScalarFunction для отображения из строки в Tuple2<GenericRecord,GenericRecord>>
.
Возможно ли это, и если да, то как? Если нет, то какой обходной путь можно использовать для решения этой проблемы? Я также был бы признателен за предложения по более элегантному способу в целом, но, судя по количеству исследований, которые я провел в этом направлении, и из-за характера варианта использования, я сомневаюсь, что так и есть. К сожалению, переход на SpecificRecord не возможен.