Извлечение данных из neo4j, поскольку Apache Spark Dataframe дает java.util.Collections $ UnmodifiableMap не является допустимым внешним типом для схемы строки - PullRequest
0 голосов
/ 17 мая 2018

Я использую neo4j-spark-connector , и кажется, что я не могу получить данные как Dataframe. (Я могу получить данные как JavaRDD и успешно с ними работать.)

Мой исходный код для загрузки и использования Dataframe выглядит следующим образом:

Dataset<Row> neo4jDF =  neo4j.cypher("match (n:NodeType1) return properties(n)", a).loadDataFrame();
Dataset<Row> df2 = //load df2 

neo4jDF.createOrReplaceTempView("neo4jtable");
df2.createOrReplaceTempView("df2table");

Dataset<Row>  joinedData = ss.sql("SELECT * from df2table, neo4jtable WHERE df2table.id = neo4jtable.id");

Но это давало мне следующее исключение:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: neo4jtable; line 1 pos 26
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:449)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:468)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:453)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
    (omitted some stacktrace, should I add?)

Поэтому я удалил все, что связано с присоединением, добавил простой Dataframe.show() вызов:

Dataset<Row> neo4jDF =  neo4j.cypher("match (n:NodeType1) return properties(n)", a).loadDataFrame();
neo4jDF.show();

Но теперь я получаю следующее исключение:

18/05/16 18:54:12 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.Collections$UnmodifiableMap is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n)), StringType), true) AS properties(n)#0
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n)), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 0
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n)), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n)), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n))
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279)
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:247)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.util.Collections$UnmodifiableMap is not a valid external type for schema of string
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:276)
    ... 17 more
18/05/16 18:54:12 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.Collections$UnmodifiableMap is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n)), StringType), true) AS properties(n)#0
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n)), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 0
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n)), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n)), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n))
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279)
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:247)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.util.Collections$UnmodifiableMap is not a valid external type for schema of string
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:276)
    ... 17 more

18/05/16 18:54:12 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
18/05/16 18:54:12 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
18/05/16 18:54:12 INFO TaskSchedulerImpl: Cancelling stage 2
18/05/16 18:54:12 INFO DAGScheduler: ResultStage 2 (show at Neo4jSparkRDDDemo2.java:85) failed in 0.345 s
18/05/16 18:54:12 INFO DAGScheduler: Job 2 failed: show at Neo4jSparkRDDDemo2.java:85, took 0.369209 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.Collections$UnmodifiableMap is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n)), StringType), true) AS properties(n)#0
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n)), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 0
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n)), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n)), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n))
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279)
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:247)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.util.Collections$UnmodifiableMap is not a valid external type for schema of string
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:276)
    ... 17 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924)
    at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:1924)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2139)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
    at Neo4jSparkRDDDemo2.main(Neo4jSparkRDDDemo2.java:85)
Caused by: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.Collections$UnmodifiableMap is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n)), StringType), true) AS properties(n)#0
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n)), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 0
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n)), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n)), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, properties(n))
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279)
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:247)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.util.Collections$UnmodifiableMap is not a valid external type for schema of string
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:276)
    ... 17 more

Что здесь не так?

Обновление

Я могу получить данные как JavaRDD<Row>, преобразовать их в JavaRDD<CustomClass> и затем преобразовать этот JavaRDD<CustomClass> экземпляр в Dataset<Row> экземпляр. Но тогда почему сам соединитель не возвращает набор данных успешно?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...