Сбой повторного разбиения с использованием pyspark с ошибкой - PullRequest
1 голос
/ 29 апреля 2020

У меня есть паркет в папке s3 с нижним столбцом. Размер паркета составляет около 40 МБ.

org_id, device_id, channel_id, source, col1, col2

сейчас раздел находится на 3 столбца org_id device_id channel_id

Я хочу изменить раздел на source, org_id, device_id, channel_id. Я использую pyspark для чтения файла из s3 и записи в корзину s3.

sc = SparkContext(appName="parquet_ingestion1").getOrCreate()
spark = SparkSession(sc)
file_path = "s3://some-bucket/some_folder"
print("Reading parquet from s3:{}".format(file_path))
spark_df = spark.read.parquet(file_path)
print("Converting to parquet")
file_path_re = "s3://other_bucket/re-partition"
partition_columns = ["source", "org_id", "device_id", "channel_id "]

spark_df.repartition(1).write.partitionBy(partition_columns).mode('append').parquet(file_path_re)

Я получаю сообщение об ошибке, и файл паркета не создан.

spark_df.repartition(1).write.partitionBy(partition_columns).mode('append').parquet(file_path_re)
[Stage 1:>                                                        (0 + 8) / 224]20/04/29 13:29:44 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, ip-172-31-43-0.ap-south-1.compute.internal, executor 3): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
        at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
        at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:380)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Затем я попытался

spark_df.write.partitionBy(partition_columns).mode('append').parquet(file_path_re)


spark_df.write.partitionBy(partition_columns).mode('append').parquet(file_path_re)
[Stage 3:>                                                        (0 + 8) / 224]20/04/29 13:32:11 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 23, ip-172-31-42-4.ap-south-1.compute.internal, executor 5): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
        at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
        at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:380)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

[Stage 3:==>                                                      (8 + 8) / 224]20/04/29 13:32:22 WARN TaskSetManager: Lost task 0.2 in stage 3.0 (TID 40, ip-172-31-42-4.ap-south-1.compute.internal, executor 5): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
        at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)

Во втором случае это дает сбой, но это также создает паркет. Теперь я не уверен, что он правильно создает все данные для нового раздела. Дайте мне знать, как правильно переразметить паркет.

ОБНОВЛЕНИЕ 1:

from pyspark.sql.types import StringType
for col1 in partition_columns:
    spark_df=spark_df.withColumn(col1, col(col1).cast(dataType=StringType()))             

Пробовал оба spark_df.repartition (1) .write.partitionBy (partition_columns) .mode (' append '). parquet (file_path_re)

spark_df.write.partitionBy (partition_columns) .mode (' append '). parquet (file_path_re)
Я получаю следующую ошибку

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 20, ip-172-31-42-4.ap-south-1.compute.internal, executor 4): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
        at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
        at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:380)

ОБНОВЛЕНИЕ 2:

Теперь я обнаружил, что существует несоответствие схемы в одном из столбцов, где одна строка является строкой, другая является плавающей. Я описал сценарий ниже. Здесь вы можете видеть, что столбец col1 является строкой в ​​одной строке и float для другой строки

org_id, device_id, channel_id, source,    col1, col2
"100"    "device1"  "channel"   "source1"  10    0.1
"100"    "device1"  "channel"   "source2"  "10"  0.1

Я попытался привести столбец col1 к float.it dodn; t сработало

Любое предложение.

Ответы [ 2 ]

0 голосов
/ 01 мая 2020

Root причина проблемы указана в ОБНОВЛЕНИИ2. В моем случае у нас есть 4 приложения (часть различных конвейеров, основанных на источнике), которые пишут в паркетный магазин. 2 app APP1 и APP2 не используют col1, а APP 3 использовал для записи его как float. Недавно APP4 начал получать col1 в своих данных и сохранял их как строку в parquet.parquet, не жалуйся во время записи. При чтении такого паркета было сделано

  1. Я пробовал приведение, оно не сработало
  2. Схема слияния не удалась из-за несоответствия в типе данных
  3. Я попытался отфильтровать данные на основе типа источника , это работало частично в том смысле, что если отфильтровывать данные APP4 это работало. но если отфильтровывать данные APP3 это не работало. сейчас же.

    Решения:
    1. отфильтруйте исходные данные app4 и создайте фрейм данных и преобразуйте их в паркет, а затем отфильтруйте только исходный паркет app4 во фрейме данных и удалите col1 и преобразуйте его в паркет ,

  4. Или Удалить столбец из целого фрейма данных и записать в паркет.

df1 = df.select ([c для c в df.columns, если c! = 'Col1'])

0 голосов
/ 29 апреля 2020

Попробуйте принудительное приведение всех типов partition_columns к StringType

...