Попробуйте использовать BeamSQL для удаления вложенного типа PCollection.Предположим, что PCollection, где есть сотрудники и его детали.Здесь детали находятся во вложенной коллекции.Так что, если мы используем BeamSQL как "SELECT PCOLLECTION.details FROM PCOLLECTION"
, то получим вложенный тип деталей в виде коллекции массивов в отдельной PCollection.Однако, когда я хочу получить конкретный столбец из коллекции вложенных типов в качестве подробностей, я получаю сообщение об ошибке, например, невозможно найти имя столбца.Пробовал BeamSQL вроде (похоже на BigQuery SQL) "SELECT X.address FROM PCOLLECTION, Unnest(details) as X"
, затем получал исключение nullpointer.Используется версия 2.12.0 apache beam.
Оцените кого-нибудь, пожалуйста, помогите по этому вопросу.
Ниже приведен пример данных вложенных данных Value (в деталях есть электронная почта, телефонные столбцы.n 'no списка деталей. Здесь есть два списка деталей):
WARNING: printValue:Row:[[Row:[lourdurajan@gmail.com, 9840618047], Row:[lourdurajan@sanmina.com, 9840618047]]]
Вот трассировка стека Java для второго оператора select:
SELECT `X`.`email`
FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`,
UNNEST(`PCOLLECTION`.`details`) AS `X`
May 08, 2019 11:23:30 AM org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner convertToBeamRel
INFO: SQLPlan>
LogicalProject(email=[$3])
LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
BeamIOSourceRel(table=[[beam, PCOLLECTION]])
Uncollect
LogicalProject(details=[$cor0.details_2])
LogicalValues(tuples=[[{ 0 }]])
May 08, 2019 11:23:30 AM org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner convertToBeamRel
INFO: BEAMPlan>
BeamCalcRel(expr#0..4=[{inputs}], email=[$t3])
BeamUnnestRel(unnestIndex=[2])
BeamIOSourceRel(table=[[beam, PCOLLECTION]])
[WARNING]
java.lang.NullPointerException
at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:171)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamUnnestRel$Transform.expand(BeamUnnestRel.java:93)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamUnnestRel$Transform.expand(BeamUnnestRel.java:87)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:66)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.lambda$buildPCollectionList$0(BeamSqlRelUtils.java:47)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.buildPCollectionList(BeamSqlRelUtils.java:48)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:64)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:36)
at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:111)
at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:79)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
at com.sanmina.BeamSQLUnnest.main(BeamSQLUnnest.java:217)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
at java.lang.Thread.run(Thread.java:748)