Запись Dataframe в Cassandra через Spark приводит к: java .lang.NullPointerException: значение параметра не может быть нулевым - PullRequest
1 голос
/ 27 января 2020

Я пытаюсь записать простой DataFrame из моего PySpark в Cassandra. У меня запущен контейнер Docker.

В моей настройке я читаю содержимое нескольких файлов JSON в DataFrame, используя

df = spark.read.json(path)

... Затем я делаю очень простые c операции над фреймом, например

df = df.withColumn("friends", split(col("friends"), ", ").cast(ArrayType(StringType())))

Моя схема DF затем дает мне:

root
 |-- average_stars: double (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- fans: long (nullable = true)
 |-- friends: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: timestamp (nullable = true)

, хотя я создал таблицу с помощью:

CREATE TABLE user (
    user_id varchar PRIMARY KEY,
    name varchar,
    review_count int,
    yelping_since date,
    friends set<text>,
    useful int,
    funny int,
    cool int,
    fans int,
    elite set<int>,
    average_stars float
);

Считывается чтение из нескольких JSON файлов, это только один из файлов, который доставляет мне проблемы ..

Вызов

df.write \
        .format("org.apache.spark.sql.cassandra") \
        .mode(mode) \
        .options(table=table, keyspace=keyspace) \
        .save()

выдает ошибку:

Caused by: java.lang.NullPointerException: Parameter value cannot be null
    at shade.com.datastax.spark.connector.google.common.base.Preconditions.checkNotNull(Preconditions.java:226)
    at com.datastax.driver.core.CodecRegistry.findCodec(CodecRegistry.java:511)
    at com.datastax.driver.core.CodecRegistry.maybeCreateCodec(CodecRegistry.java:630)
    at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:538)
    at com.datastax.driver.core.CodecRegistry.findCodec(CodecRegistry.java:520)
    at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:470)
    at com.datastax.spark.connector.util.CodecRegistryUtil$.codecFor(CodecRegistryUtil.scala:8)
    at com.datastax.spark.connector.writer.BoundStatementBuilder.com$datastax$spark$connector$writer$BoundStatementBuilder$$bindColumnUnset(BoundStatementBuilder.scala:72)
    at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$6.apply(BoundStatementBuilder.scala:84)
    at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$6.apply(BoundStatementBuilder.scala:84)
    at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$bind$1.apply$mcVI$sp(BoundStatementBuilder.scala:106)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:101)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:233)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

Я совершенно и совершенно ошеломлен тем, что я здесь пропускаю - со всеми другими файлами он работает нормально. Поскольку сообщения об ошибках читаются как Parameter value cannot be null Я предполагаю, что я упустил из виду некоторую форму конфигурации - но как же тогда, что все мои другие Dataframes пишут в Cassandra нормально?

Помощь очень ценится: -)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...