Я пытаюсь записать простой DataFrame из моего PySpark в Cassandra. У меня запущен контейнер Docker.
В моей настройке я читаю содержимое нескольких файлов JSON в DataFrame, используя
df = spark.read.json(path)
... Затем я делаю очень простые c операции над фреймом, например
df = df.withColumn("friends", split(col("friends"), ", ").cast(ArrayType(StringType())))
Моя схема DF затем дает мне:
root
|-- average_stars: double (nullable = true)
|-- cool: long (nullable = true)
|-- elite: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- fans: long (nullable = true)
|-- friends: array (nullable = true)
| |-- element: string (containsNull = true)
|-- funny: long (nullable = true)
|-- name: string (nullable = true)
|-- review_count: long (nullable = true)
|-- useful: long (nullable = true)
|-- user_id: string (nullable = true)
|-- yelping_since: timestamp (nullable = true)
, хотя я создал таблицу с помощью:
CREATE TABLE user (
user_id varchar PRIMARY KEY,
name varchar,
review_count int,
yelping_since date,
friends set<text>,
useful int,
funny int,
cool int,
fans int,
elite set<int>,
average_stars float
);
Считывается чтение из нескольких JSON файлов, это только один из файлов, который доставляет мне проблемы ..
Вызов
df.write \
.format("org.apache.spark.sql.cassandra") \
.mode(mode) \
.options(table=table, keyspace=keyspace) \
.save()
выдает ошибку:
Caused by: java.lang.NullPointerException: Parameter value cannot be null
at shade.com.datastax.spark.connector.google.common.base.Preconditions.checkNotNull(Preconditions.java:226)
at com.datastax.driver.core.CodecRegistry.findCodec(CodecRegistry.java:511)
at com.datastax.driver.core.CodecRegistry.maybeCreateCodec(CodecRegistry.java:630)
at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:538)
at com.datastax.driver.core.CodecRegistry.findCodec(CodecRegistry.java:520)
at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:470)
at com.datastax.spark.connector.util.CodecRegistryUtil$.codecFor(CodecRegistryUtil.scala:8)
at com.datastax.spark.connector.writer.BoundStatementBuilder.com$datastax$spark$connector$writer$BoundStatementBuilder$$bindColumnUnset(BoundStatementBuilder.scala:72)
at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$6.apply(BoundStatementBuilder.scala:84)
at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$6.apply(BoundStatementBuilder.scala:84)
at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$bind$1.apply$mcVI$sp(BoundStatementBuilder.scala:106)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:101)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:233)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Я совершенно и совершенно ошеломлен тем, что я здесь пропускаю - со всеми другими файлами он работает нормально. Поскольку сообщения об ошибках читаются как Parameter value cannot be null
Я предполагаю, что я упустил из виду некоторую форму конфигурации - но как же тогда, что все мои другие Dataframes пишут в Cassandra нормально?
Помощь очень ценится: -)