Запись целого числа / строки в текстовый файл в pyspark из кластера - PullRequest
1 голос
/ 23 марта 2020

Я использую шаговые функции EMR для анализа данных. Я хотел сохранить количество проанализированных данных, чтобы решить, могу ли я сохранить его как CSV или паркет. Я бы предпочел CSV, но если размер слишком велик, я не смогу скачать его и использовать на своем ноутбуке. Я использовал метод count(), чтобы сохранить его в переменной int limit Когда я пытаюсь использовать следующий код:

coalesce(1).write.format("text").option("header", "false").mode("overwrite").save("output.txt")

Он говорит, что:

int не имеет любой атрибут с именем write

Есть ли способ записать целые числа или строку в файл, чтобы я мог открыть его в своем контейнере s3 и проверить после выполнения шага EMR?

Обновление: Я попробовал метод dataframe, предложенный @Shu, но получаю следующую ошибку.

Причина: org. apache .spark.SparkException: задание прервано из-за сбоя этапа: задача 0 на этапе 13.0 провалилась 4 раза, последний сбой: потерянное задание 0.3 на этапе 13.0 (TID 19396, ip-10-210-13-34.ec2.internal, исполнитель 11): org. apache .spark.SparkException: не удалось выполнить задачу при записи строк. в орг. apache .spark. sql .execution.datasources.FileFormatWriter $ .org $ apache $ spark $ sql $ исполнительный $ источники данных $ FileFormatWriter $$ executeTask (FileFormatWriter. scala: 257) в орг. apache .spark. sql .execution.datasources.FileFormatWriter $$ anonfun $ write $ 1.apply (FileFormatWriter. scala: 170)

Что может быть причиной root этого?

1 Ответ

1 голос
/ 23 марта 2020

Вы можете parallelize переменную int, чтобы создать rdd, а затем выполнить запись в HDFS, используя .saveAsTextFile

df.show()
#+---+
#| _1|
#+---+
#|  a|
#|  b|
#+---+
limit=df.count()
spark.sparkContext.parallelize([limit]).coalesce(1).saveAsTextFile("<path>")

#content of file
#cat <path>/part-00000
#2 

Другим способом будет создание dataframe из count variable, затем напишите в формате csv в качестве заголовка false.

from pyspark.sql.types import *
spark.createDataFrame(spark.sparkContext.parallelize([limit]),IntegerType()).coalesce(1).write.format("csv").option("header", "false").mode("overwrite").save("<path>")

#or in text format
spark.createDataFrame(spark.sparkContext.parallelize([limit]),StringType()).coalesce(1).write.format("text").mode("overwrite").save("<path>")

#cat part-*
#2
...