Соединение двух фреймов данных работало, но теперь происходит сбой при изменении содержимого одного из фреймов данных. - PullRequest
0 голосов
/ 04 июня 2018

Я делал следующее:

  • выборка фрейма данных из neo4j с помощью neo4j-spark-connector
  • выборка фрейма данных из hbase с использованием коннектора apache hbase spark
  • печать обоихфрейм данных в консоль с помощью df.show()
  • , соединяющего оба фрейма данных с использованием spark sql с использованием уникального столбца id, присутствующего в обоих фреймах данных:

    Dataset<Row> mergedData = ss.sql("SELECT * from hbasetable, neo4jtable WHERE hbasetable.nodeId = neo4jtable.id");
    
  • печать объединенного фрейма данных в консоль с помощьюmergedData.show()

Это прекрасно работало.Однако теперь я изменил запрос на шифрование, используя который я получал данные neo4j.Раньше мой шифр нравился так:

Match (n:Type1 {caption:'type1caption"'})-[:contains]->(m:Type2) return m.attr1, m.attr2, m.attr3, m.attr4, m.attr5, m.attr6, m.attr7, m.attr8, m.id as id, m.attr9, m.attr10, m.attr11

Теперь это так:

Match (m:Type1) return m.attr1, m.attr2, m.attr3, m.attr4, m.attr5, m.attr6, m.attr7, m.attr8, m.id as id, m.attr9, m.attr10, m.attr11

Но теперь соединение не удается.Это дает мне следующее исключение:

Long is not a valid external type for schema of string

Кажется, что содержимое как нового neo4j dataframe, так и hbase dataframe корректно выбирается, так как neo4jdf.show() и hbasedf.show() показывают данные на консоли.Мне было интересно, если ранее объединение работало нормально, и если данные правильно извлекаются, то что может привести к сбою объединения.

Основная проблема заключается в том, что я не могу интерпретировать трассировку стека, напечатанную на консоли.Это выглядит примерно так:

18/06/04 17:47:48 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.Long is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, m.attr1), NullType) AS m.attr1#0
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, m.attr1), NullType)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 0
   :- null
   +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, m.attr1), NullType)
      +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, m.attr1)
         +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
            +- input[0, org.apache.spark.sql.Row, true]

     :
     :
   lot of stack trace omitted
     :
     :
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 11, m.attr11), StringType), true) AS m.attr11#11
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 11, m.attr11), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 11
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 11, m.attr11), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 11, m.attr11), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 11, m.attr11)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279)
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.Long is not a valid external type for schema of string
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:276)
    ... 16 more

Полный след стека можно найти в gist здесь

Я чувствую, что это должно быть что-то, связанное с несогласованностью в данных neo4j,Если я могу узнать идентификатор узла neo4j, в котором происходит исключение, я могу проверить его.Но я абсолютно не в состоянии интерпретировать эту трассировку стека. Можно ли узнать, на какой записи подготовка фрейма данных не выполнена во время выполнения соединения?

Обновление

Я удалил все, что связано с объединением и hbaseи добавил neo4jdf.show(24000,false);, и это дает ту же ошибку, что и выше.Есть 23748 записей.Когда я печатаю небольшое количество записей (скажем, neo4jdf.show(1000)), он печатает их без ошибок.Но когда я разрешаю распечатать 24000 записей, это не удается.Это означает, что что-то не так с некоторым узлом.Но как я могу указать это?

1 Ответ

0 голосов
/ 06 июня 2018

Источник проблемы - неоднородные данные в вашем источнике данных.Spark SQL использует реляционную модель, и разнородные столбцы не допускаются.

Если вы не можете гарантировать, что все записи правильно структурированы, я бы посоветовал извлечь все в виде строки:

return toString(m.attr1), toString(m.attr2), ..., toString(m.attr11)

и привести кнеобходимые типы, используя стандартные операторы Spark:

df.select($"attr1".cast(...), ...)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...