Балка SQL - SqlValidatorException: объект 'PCOLLECTION' не найден - PullRequest
0 голосов
/ 02 июля 2019

Я провожу некоторые эксперименты с Beam SQL.Я получаю PCollection<Row> из преобразования SampleSource и передаю его вывод в SqlTransform.

String sql1 = "select c1, c2, c3 from PCOLLECTION where c1 > 1";

Код ниже работает без ошибок.

POutput it = p.apply(new SampleSource()).apply(SqlTransform.query(sql1));
p.run().waitUntilFinish();

Однако,когда я пытаюсь выполнить следующие строки кода, я получаю сообщение об ошибке во время выполнения.

POutput it = p.apply(new SampleSource());
it.getPipeline().apply(SqlTransform.query(sql1));
p.run().waitUntilFinish();

Сведения об ошибке:

Caused by: org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql.validate.SqlValidatorException: Object 'PCOLLECTION' not found

Пожалуйста, предоставьте несколько указателей.

1 Ответ

1 голос
/ 02 июля 2019

Это не работает, потому что вы применяете SqlTransform к конвейеру, а не PCollection.

Возможно, вы захотите изменить его следующим образом:


// source probably returns a PCollection,
// would make sense to change 'it' to PCollection:
PCollection<...> it = p.apply(new SampleSource());

// then apply SqlTransform to the PCollection from the previous step,
// that is apply it directly to 'it':
it.apply(SqlTransform.query(sql1));

...

Как работает конвейер Beam с точки зрения высокого уровня:

  • создать конвейер;
  • применить IO PTransform, который читает из какого-то источника и производит PColelction из некоторогоэлементы, которые он считывает из источника;
  • chain-apply больше PTransforms к PCollection из предыдущего шага для обработки данных (концептуально, на каждом шаге будут создаваться разные PCollections);
  • repeat;

SqlTransform - это обычный PTransform, ожидается, что он будет применен к PCollection элементов и в результате выведет еще один PCollection.Запрос, указанный в SqlTransform.create(), применяется к PCollection.Он ожидает, что данные поступят из волшебной таблицы PCOLLECTION, которая представляет PCollection, к которому вы применили SqlTransform.

То, что вы делаете в своем примере, отличается:

  • создать конвейер;
  • применить источник PTransform, который выдает POutput не обязательно PCollection;
  • , тогда вы игнорируете вывод, если ваш источник, но вместо этого принимаетеисходный конвейер и применить SqlTransform непосредственно к нему;

Так что получается, что SqlTransform в этом случае применяется к «корню» конвейера, а не к PCollectionэто исходит из источника.Вместо цепочки PTransforms, применяемой одна за другой, теперь у вас есть два PTransforms, примененные к корню независимо друг от друга.

Еще одно предостережение: SqlTransform ожидает, что входные элементы будут Rows,потому что SQL как язык работает только с данными, представленными в виде строк.Для этого есть два способа:

  • вручную преобразовать элементы, созданные источником, в Rows, применив еще один ParDo между источником и SqlTransform;
  • используйте Schema среду Beam (например, метод извлечения PCollection.setSchema()), которая позволяет Beam SQL автоматически преобразовывать элементы ввода в Rows;
...