pyspark - чтение даты разбитых файлов avro из облачного хранилища Google - PullRequest
0 голосов
/ 16 января 2019

Я пытаюсь прочитать разделенные по дате avro-файлы, хранящиеся в облачном хранилище Google, с помощью pyspark. Ниже представлена ​​структура моей папки, и это иерархия, которая исходит от третьей стороны и не может быть изменена.

email.bounce
   - event_type=users.messages.email.Bounce 
              - date=2019-01-14-04/
                 - 422/
                     - prod03
                        -data.avro 

              - date=2019-01-15-04/
                     - 422/
                       - prod03
                          -data.avro 

              - date=2019-01-16-04/
                     - 422/
                       - prod03
                          -data.avro 

Ниже показан файл data.avro:

Objavro.schemaì{"type":"record","name":"Bounce","namespace":"some value","doc":"when an email bounces","fields":[{"name":"id","type":"string","doc":"globally unique id for this event"},{"name":"user_id","type":"string","doc":"BSON id of the user that this email was sent to"},{"name":"external_user_id","type":["null","string"],"doc":"external user id of the user","default":null},{"name":"time","type":"int","doc":"unix timestamp at which the email bounced"},{"name":"timezone","type":["null","string"],"doc":"timezone of the user","default":null}

Я пытаюсь сделать следующее с помощью pyspark:

file_path = "gs://bucket/email.bounce/event_type=users.messages.email.Bounce/*"

df = spark.read.format("com.databricks.spark.avro").load(file_path)

df.show()

2 Ошибки:

1- Не найдено ни одного файла avro, не уверен, где его установить. Я попытался установить avro.mapred.ignore.inputs.without.extension в false с помощью

conf = SparkConf().set("avro.mapred.ignore.inputs.without.extension", "false")
sc = pyspark.SparkContext(conf=conf)

2 - Если я даю точный путь к файлу avro, который указывается до data.avro, и читаю его с помощью pyspark, я получаю следующую ошибку:

An error occurred while calling o782.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, temp-spark-w-1.c.namshi-analytics.internal, executor 1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD

Любые предложения будут полезны.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...