Я делал следующее:
- выборка фрейма данных из neo4j с помощью neo4j-spark-connector
- выборка фрейма данных из hbase с использованием коннектора apache hbase spark
- печать обоихфрейм данных в консоль с помощью
df.show()
, соединяющего оба фрейма данных с использованием spark sql с использованием уникального столбца id, присутствующего в обоих фреймах данных:
Dataset<Row> mergedData = ss.sql("SELECT * from hbasetable, neo4jtable WHERE hbasetable.nodeId = neo4jtable.id");
- печать объединенного фрейма данных в консоль с помощью
mergedData.show()
Это прекрасно работало.Однако теперь я изменил запрос на шифрование, используя который я получал данные neo4j.Раньше мой шифр нравился так:
Match (n:Type1 {caption:'type1caption"'})-[:contains]->(m:Type2) return m.attr1, m.attr2, m.attr3, m.attr4, m.attr5, m.attr6, m.attr7, m.attr8, m.id as id, m.attr9, m.attr10, m.attr11
Теперь это так:
Match (m:Type1) return m.attr1, m.attr2, m.attr3, m.attr4, m.attr5, m.attr6, m.attr7, m.attr8, m.id as id, m.attr9, m.attr10, m.attr11
Но теперь соединение не удается.Это дает мне следующее исключение:
Long is not a valid external type for schema of string
Кажется, что содержимое как нового neo4j dataframe, так и hbase dataframe корректно выбирается, так как neo4jdf.show()
и hbasedf.show()
показывают данные на консоли.Мне было интересно, если ранее объединение работало нормально, и если данные правильно извлекаются, то что может привести к сбою объединения.
Основная проблема заключается в том, что я не могу интерпретировать трассировку стека, напечатанную на консоли.Это выглядит примерно так:
18/06/04 17:47:48 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.Long is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, m.attr1), NullType) AS m.attr1#0
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, m.attr1), NullType)
:- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
: :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
: : +- input[0, org.apache.spark.sql.Row, true]
: +- 0
:- null
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, m.attr1), NullType)
+- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, m.attr1)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
+- input[0, org.apache.spark.sql.Row, true]
:
:
lot of stack trace omitted
:
:
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 11, m.attr11), StringType), true) AS m.attr11#11
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 11, m.attr11), StringType), true)
:- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
: :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
: : +- input[0, org.apache.spark.sql.Row, true]
: +- 11
:- null
+- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 11, m.attr11), StringType), true)
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 11, m.attr11), StringType)
+- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 11, m.attr11)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
+- input[0, org.apache.spark.sql.Row, true]
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279)
at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.Long is not a valid external type for schema of string
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:276)
... 16 more
Полный след стека можно найти в gist здесь
Я чувствую, что это должно быть что-то, связанное с несогласованностью в данных neo4j,Если я могу узнать идентификатор узла neo4j, в котором происходит исключение, я могу проверить его.Но я абсолютно не в состоянии интерпретировать эту трассировку стека. Можно ли узнать, на какой записи подготовка фрейма данных не выполнена во время выполнения соединения?
Обновление
Я удалил все, что связано с объединением и hbaseи добавил neo4jdf.show(24000,false);
, и это дает ту же ошибку, что и выше.Есть 23748 записей.Когда я печатаю небольшое количество записей (скажем, neo4jdf.show(1000)
), он печатает их без ошибок.Но когда я разрешаю распечатать 24000 записей, это не удается.Это означает, что что-то не так с некоторым узлом.Но как я могу указать это?