Разверните вложенные PCollection с помощью BeamSQL - PullRequest
0 голосов
/ 07 мая 2019

Попробуйте использовать BeamSQL для удаления вложенного типа PCollection.Предположим, что PCollection, где есть сотрудники и его детали.Здесь детали находятся во вложенной коллекции.Так что, если мы используем BeamSQL как "SELECT PCOLLECTION.details FROM PCOLLECTION", то получим вложенный тип деталей в виде коллекции массивов в отдельной PCollection.Однако, когда я хочу получить конкретный столбец из коллекции вложенных типов в качестве подробностей, я получаю сообщение об ошибке, например, невозможно найти имя столбца.Пробовал BeamSQL вроде (похоже на BigQuery SQL) "SELECT X.address FROM PCOLLECTION, Unnest(details) as X", затем получал исключение nullpointer.Используется версия 2.12.0 apache beam.

Оцените кого-нибудь, пожалуйста, помогите по этому вопросу.

Ниже приведен пример данных вложенных данных Value (в деталях есть электронная почта, телефонные столбцы.n 'no списка деталей. Здесь есть два списка деталей):

WARNING: printValue:Row:[[Row:[lourdurajan@gmail.com, 9840618047], Row:[lourdurajan@sanmina.com, 9840618047]]]

Вот трассировка стека Java для второго оператора select:

SELECT `X`.`email`
FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`,
UNNEST(`PCOLLECTION`.`details`) AS `X`
May 08, 2019 11:23:30 AM org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner convertToBeamRel
INFO: SQLPlan>
LogicalProject(email=[$3])
  LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
    BeamIOSourceRel(table=[[beam, PCOLLECTION]])
    Uncollect
      LogicalProject(details=[$cor0.details_2])
        LogicalValues(tuples=[[{ 0 }]])

May 08, 2019 11:23:30 AM org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner convertToBeamRel
INFO: BEAMPlan>
BeamCalcRel(expr#0..4=[{inputs}], email=[$t3])
  BeamUnnestRel(unnestIndex=[2])
    BeamIOSourceRel(table=[[beam, PCOLLECTION]])

[WARNING] 
java.lang.NullPointerException
    at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:171)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamUnnestRel$Transform.expand(BeamUnnestRel.java:93)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamUnnestRel$Transform.expand(BeamUnnestRel.java:87)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:66)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.lambda$buildPCollectionList$0(BeamSqlRelUtils.java:47)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.Iterator.forEachRemaining(Iterator.java:116)
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.buildPCollectionList(BeamSqlRelUtils.java:48)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:64)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:36)
    at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:111)
    at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:79)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
    at com.sanmina.BeamSQLUnnest.main(BeamSQLUnnest.java:217)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
    at java.lang.Thread.run(Thread.java:748)
...