Похоже, вам нужно установить схему на PCollection
.В прохождении, которое вы связали, есть Create...withCoder()
, который справится с этим.В вашем случае схема не может быть выведена из вашего ParDo
, единственная информация, которую потенциально может увидеть Beam, - это то, что он выводит элементы типа Row
, но нет никакой доступной ему информации о том, придерживается ли ваш ParDo
хотя бы одинсхема для всех выходов.Поэтому вам нужно позвонить pcollection.setRowSchema()
, прежде чем применять SqlTransform
, чтобы сообщить Beam, какую схему вы планируете использовать при конвертации ParDo
.
Обновление
И похоже, что большая часть того, что вы делаете до HCatalog
, вероятно, в конечном итоге сильно упростится, например, представьте, что вам нужно указать что-то вроде pipeline.apply(TextIO.readCsvRows(schema)).apply(sqlTransform)...
.Фактически Beam SQL поддерживает чтение CSV-файлов без дополнительного преобразования ParDos
(через TextTableProvider
), но пока не подключено до SqlTransform
и доступно только через Beam SQL CLI