Программно вывести схему для подготовки набора данных Spark DataFrame <Row>из RDD <Row>, когда некоторые объекты Row могут содержать различное количество элементов - PullRequest
0 голосов
/ 25 мая 2018

Я получаю информацию об узле neo4j в spark rdd, используя neo4j-spark разъем .Я могу получить RDD<Row>, вызвав loadNodeRdds() метод .Но когда я пытаюсь получить фрейм данных, вызывающий loadDataframe() метод , он выдает исключение (пропустите трассировку стека, если вы обнаружите, что это слишком долго, поскольку основной вопрос может оказаться другим в конце):

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.Collections$UnmodifiableMap is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType), true) AS Condition#4
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 0
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

(skipped a lot of rows as it made question reach its character limit)

Я не смог многого получить от большого стэкаша выше.

Итак, я взял JavaRDD<Row> и попытался преобразовать его в DataFrame<Row>, программно указав StructType схему .

StructType schema = loadSchema();
Dataset<Row> df = ss.createDataFrame(neo4jJavaRdd , schema);

Это вызвало несколько похожее исключение.

Итак, я взял отдельные свойства одного узла neo4j, подготовил Row, а затем JavaRDD<Row> из него, а затем попытался создать из него фрейм данных, программно указав схему следующим образом:

Row row1 = RowFactory.create("val1", " val2", "val3", "val4", "val5", "val6", 152214d, "val7", 152206d, 11160d, "val8");
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("attr1", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr2", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr3", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr4", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr5", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr6", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attrd1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("attr7", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attrd2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("attrd3", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("attr8", DataTypes.StringType, true));

Это сработало.

Поэтому я проверил все узлы и понял, что не все узлы (то есть все Row с в JavaRDD<Row>) имеют одинаковое количество атрибутов.Это должно вызывать сбой подготовки фрейма данных.Можно ли как-то обработать это программно, не требуя создания и указания pojo .

Ответы [ 2 ]

0 голосов
/ 04 июня 2018

Есть некоторые вещи, которые я осознал, работая с neo4j-spark-connector , которыми я хочу поделиться здесь.

  1. В целом, если вы собираетесь подготовить фрейм данных , не желательно возвращать типы объектов neo4j, в частности, узел и отношения.Это примерно так, как показано ниже: возвращаемый узел не является предпочтительным:

    MATCH(n {id:'xyz'}) RETURN n
    

    Вместо этого возвращайте свойства:

    MATCH(n {id:'xyz'}) RETURN properties(n)
    
  2. Если вы не уверены, что все узлы будутЕсли число свойств не равно , то лучше возвращать их явно, а не возвращать свойства и получать JavaRDD.Поскольку это потребует от нас обработки JavaRDD снова, чтобы добавить NULL для несуществующих свойств.Это вместо того, чтобы делать это:

    MATCH(n {id:'xyz'}) RETURN properties(n)
    

    вернуть таким образом:

    MATCH(n {id:'xyz'}) RETURN n.prop1 AS prop1, n.prop2 AS prop2, ..., n.propN AS propN
    

    Neo4j сам добавит NULL s для несуществующих свойств, как это видно на рисунке ниженам не нужно повторять их снова.Вернув его, я смог получить информацию об узле neo4j напрямую, используя метод loadDataframe().

    enter image description here

0 голосов
/ 27 мая 2018

Если вы хотите сделать это с помощью RDD, как вы упомянули, выполните следующие действия:

  • Прежде чем пытаться преобразовать (RDD + схема) в кадр данных, перейдите к RDD (используяфункции карты) и убедитесь, что каждая строка имеет все соответствующие атрибуты.
  • Если атрибут отсутствует в строке, добавьте его и сделайте его пустым.

После этогостроки RDD будут иметь одинаковую схему, и преобразование в фрейм данных будет работать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...