Я получаю информацию об узле neo4j в spark rdd, используя neo4j-spark разъем .Я могу получить RDD<Row>
, вызвав loadNodeRdds()
метод .Но когда я пытаюсь получить фрейм данных, вызывающий loadDataframe()
метод , он выдает исключение (пропустите трассировку стека, если вы обнаружите, что это слишком долго, поскольку основной вопрос может оказаться другим в конце):
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.Collections$UnmodifiableMap is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType), true) AS Condition#4
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType), true)
:- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
: :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
: : +- input[0, org.apache.spark.sql.Row, true]
: +- 0
:- null
+- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType), true)
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType)
+- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
+- input[0, org.apache.spark.sql.Row, true]
(skipped a lot of rows as it made question reach its character limit)
Я не смог многого получить от большого стэкаша выше.
Итак, я взял JavaRDD<Row>
и попытался преобразовать его в DataFrame<Row>
, программно указав StructType
схему .
StructType schema = loadSchema();
Dataset<Row> df = ss.createDataFrame(neo4jJavaRdd , schema);
Это вызвало несколько похожее исключение.
Итак, я взял отдельные свойства одного узла neo4j, подготовил Row
, а затем JavaRDD<Row>
из него, а затем попытался создать из него фрейм данных, программно указав схему следующим образом:
Row row1 = RowFactory.create("val1", " val2", "val3", "val4", "val5", "val6", 152214d, "val7", 152206d, 11160d, "val8");
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("attr1", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr2", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr3", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr4", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr5", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr6", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attrd1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("attr7", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attrd2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("attrd3", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("attr8", DataTypes.StringType, true));
Это сработало.
Поэтому я проверил все узлы и понял, что не все узлы (то есть все Row
с в JavaRDD<Row>
) имеют одинаковое количество атрибутов.Это должно вызывать сбой подготовки фрейма данных.Можно ли как-то обработать это программно, не требуя создания и указания pojo .