Beam SQL не будет работать при использовании агрегации в операторе: «Невозможно спланировать выполнение» - PullRequest
0 голосов
/ 13 сентября 2018

У меня есть базовый конвейер Beam, который читает из GCS, выполняет преобразование Beam SQL и записывает результаты в BigQuery.

Когда я не выполняю агрегацию в своем операторе SQL, он работает нормально:

..
PCollection<Row> outputStream =
                sqlRows.apply(
                        "sql_transform",
                        SqlTransform.query("select views from PCOLLECTION"));
outputStream.setCoder(SCHEMA.getRowCoder());
..

Однако, когда я пытаюсь агрегировать с суммой, это дает сбой (выдает исключение CannotPlanException):

..
PCollection<Row> outputStream =
                sqlRows.apply(
                        "sql_transform",
                        SqlTransform.query("select wikimedia_project, sum(views) from PCOLLECTION group by wikimedia_project"));
outputStream.setCoder(SCHEMA.getRowCoder());
..

Stacktrace:

Step #1: 11:47:37,562 0    [main] INFO  org.apache.beam.runners.dataflow.DataflowRunner - PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 117 files. Enable logging at DEBUG level to see which files will be staged.
Step #1: 11:47:39,845 2283 [main] INFO  org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner - SQL:
Step #1: SELECT `PCOLLECTION`.`wikimedia_project`, SUM(`PCOLLECTION`.`views`)
Step #1: FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
Step #1: GROUP BY `PCOLLECTION`.`wikimedia_project`
Step #1: 11:47:40,387 2825 [main] INFO  org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner - SQLPlan>
Step #1: LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
Step #1:   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
Step #1: 
Step #1: Exception in thread "main" org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: Node [rel#7:Subset#1.BEAM_LOGICAL.[]] could not be implemented; planner state:
Step #1: 
Step #1: Root: rel#7:Subset#1.BEAM_LOGICAL.[]
Step #1: Original rel:
Step #1: LogicalAggregate(subset=[rel#7:Subset#1.BEAM_LOGICAL.[]], group=[{0}], EXPR$1=[SUM($1)]): rowcount = 10.0, cumulative cost = {11.375000476837158 rows, 0.0 cpu, 0.0 io}, id = 5
Step #1:   BeamIOSourceRel(subset=[rel#4:Subset#0.BEAM_LOGICAL.[]], table=[[beam, PCOLLECTION]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 2
Step #1: 
Step #1: Sets:
Step #1: Set#0, type: RecordType(VARCHAR wikimedia_project, BIGINT views)
Step #1:        rel#4:Subset#0.BEAM_LOGICAL.[], best=rel#2, importance=0.81
Step #1:                rel#2:BeamIOSourceRel.BEAM_LOGICAL.[](table=[beam, PCOLLECTION]), rowcount=100.0, cumulative cost={100.0 rows, 101.0 cpu, 0.0 io}
Step #1:        rel#10:Subset#0.ENUMERABLE.[], best=rel#9, importance=0.405
Step #1:                rel#9:BeamEnumerableConverter.ENUMERABLE.[](input=rel#4:Subset#0.BEAM_LOGICAL.[]), rowcount=100.0, cumulative cost={1.7976931348623157E308 rows, 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
Step #1: Set#1, type: RecordType(VARCHAR wikimedia_project, BIGINT EXPR$1)
Step #1:        rel#6:Subset#1.NONE.[], best=null, importance=0.9
Step #1:                rel#5:LogicalAggregate.NONE.[](input=rel#4:Subset#0.BEAM_LOGICAL.[],group={0},EXPR$1=SUM($1)), rowcount=10.0, cumulative cost={inf}
Step #1:        rel#7:Subset#1.BEAM_LOGICAL.[], best=null, importance=1.0
Step #1:                rel#8:AbstractConverter.BEAM_LOGICAL.[](input=rel#6:Subset#1.NONE.[],convention=BEAM_LOGICAL,sort=[]), rowcount=10.0, cumulative cost={inf}
Step #1: 
Step #1: 
Step #1:        at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:448)
Step #1:        at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:298)
Step #1:        at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:666)
Step #1:        at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
Step #1:        at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.prepare.PlannerImpl.transform(PlannerImpl.java:336)
Step #1:        at org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner.convertToBeamRel(BeamQueryPlanner.java:138)
Step #1:        at org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:105)
Step #1:        at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:96)
Step #1:        at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:79)
Step #1:        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
Step #1:        at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
Step #1:        at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:338)
Step #1:        at org.polleyg.TemplatePipeline.main(TemplatePipeline.java:59)
Step #1: :run FAILED
Step #1: 
Step #1: FAILURE: Build failed with an exception.

Iиспользую Beam 2.6.0

Я что-то упускаю из виду?

1 Ответ

0 голосов
/ 14 сентября 2018

Это должно работать, это ошибка.Поданный BEAM-5384 .

Если вы посмотрите на план, у него есть операция LogicalAggregate, которая представляет агрегацию и должна быть реализована Beam.Из-за того, как работает Beam, для реализации агрегации также необходимо извлечь некоторую информацию из операции LogicalProject, которая представляет доступ к полям в select f1, f2, и здесь этого не хватает.Пока не очень ясно, является ли это ошибкой, когда запрос чрезмерно оптимизирован и прогноз удален из плана, или это допустимый вариант использования, который должен поддерживать Beam.

Одно из предложений, которое у меня есть, -попробуйте изменить предложение select, например, изменить порядок полей, добавить дополнительные поля.

Обновление:

По крайней мере одна проблема была причиной этого.В основном, когда ваша схема содержит только те поля, которые вы используете в запросе, проекция не требуется, и Calcite не добавляет ее в план.Однако для агрегации Beam требуется узел проекции для извлечения информации о окнах (это текущая реализация, вероятно, это не правильно).

Обходной путь: Итак, чтобы исправить конкретный запросВы можете добавить дополнительные поля в схему и не использовать их в запросе, это приведет к тому, что Calcite добавит узел проекции в план, и будет применено агрегирование Beam SQL.

В Beam HEAD теперь исправлена ​​эта конкретная проблема: https://github.com/apache/beam/commit/8c35781d62846211e43b6b122b557f8c3fdaec6d#diff-4f4ffa265fe666e99c37c346d50da67dR637

...