Я пытаюсь прочитать разделенные по дате avro-файлы, хранящиеся в облачном хранилище Google, с помощью pyspark. Ниже представлена структура моей папки, и это иерархия, которая исходит от третьей стороны и не может быть изменена.
email.bounce
- event_type=users.messages.email.Bounce
- date=2019-01-14-04/
- 422/
- prod03
-data.avro
- date=2019-01-15-04/
- 422/
- prod03
-data.avro
- date=2019-01-16-04/
- 422/
- prod03
-data.avro
Ниже показан файл data.avro:
Objavro.schemaì{"type":"record","name":"Bounce","namespace":"some value","doc":"when an email bounces","fields":[{"name":"id","type":"string","doc":"globally unique id for this event"},{"name":"user_id","type":"string","doc":"BSON id of the user that this email was sent to"},{"name":"external_user_id","type":["null","string"],"doc":"external user id of the user","default":null},{"name":"time","type":"int","doc":"unix timestamp at which the email bounced"},{"name":"timezone","type":["null","string"],"doc":"timezone of the user","default":null}
Я пытаюсь сделать следующее с помощью pyspark:
file_path = "gs://bucket/email.bounce/event_type=users.messages.email.Bounce/*"
df = spark.read.format("com.databricks.spark.avro").load(file_path)
df.show()
2 Ошибки:
1- Не найдено ни одного файла avro, не уверен, где его установить. Я попытался установить avro.mapred.ignore.inputs.without.extension в false с помощью
conf = SparkConf().set("avro.mapred.ignore.inputs.without.extension", "false")
sc = pyspark.SparkContext(conf=conf)
2 - Если я даю точный путь к файлу avro, который указывается до data.avro, и читаю его с помощью pyspark, я получаю следующую ошибку:
An error occurred while calling o782.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, temp-spark-w-1.c.namshi-analytics.internal, executor 1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
Любые предложения будут полезны.