Это не работает, потому что вы применяете SqlTransform
к конвейеру, а не PCollection
.
Возможно, вы захотите изменить его следующим образом:
// source probably returns a PCollection,
// would make sense to change 'it' to PCollection:
PCollection<...> it = p.apply(new SampleSource());
// then apply SqlTransform to the PCollection from the previous step,
// that is apply it directly to 'it':
it.apply(SqlTransform.query(sql1));
...
Как работает конвейер Beam с точки зрения высокого уровня:
- создать конвейер;
- применить IO
PTransform
, который читает из какого-то источника и производит PColelction
из некоторогоэлементы, которые он считывает из источника; - chain-apply больше
PTransforms
к PCollection
из предыдущего шага для обработки данных (концептуально, на каждом шаге будут создаваться разные PCollections
); - repeat;
SqlTransform
- это обычный PTransform
, ожидается, что он будет применен к PCollection
элементов и в результате выведет еще один PCollection
.Запрос, указанный в SqlTransform.create()
, применяется к PCollection
.Он ожидает, что данные поступят из волшебной таблицы PCOLLECTION
, которая представляет PCollection
, к которому вы применили SqlTransform
.
То, что вы делаете в своем примере, отличается:
- создать конвейер;
- применить источник
PTransform
, который выдает POutput
не обязательно PCollection
; - , тогда вы игнорируете вывод, если ваш источник, но вместо этого принимаетеисходный конвейер и применить
SqlTransform
непосредственно к нему;
Так что получается, что SqlTransform
в этом случае применяется к «корню» конвейера, а не к PCollection
это исходит из источника.Вместо цепочки PTransforms
, применяемой одна за другой, теперь у вас есть два PTransforms
, примененные к корню независимо друг от друга.
Еще одно предостережение: SqlTransform
ожидает, что входные элементы будут Rows
,потому что SQL как язык работает только с данными, представленными в виде строк.Для этого есть два способа:
- вручную преобразовать элементы, созданные источником, в
Rows
, применив еще один ParDo
между источником и SqlTransform
; - используйте
Schema
среду Beam (например, метод извлечения PCollection.setSchema()
), которая позволяет Beam SQL автоматически преобразовывать элементы ввода в Rows
;