Pyspark: объединить все сжатые csvs в один csv в python - PullRequest
0 голосов
/ 22 февраля 2019

Если у меня есть огромные данные в виде заархивированных csvs, как я могу объединить их в один файл csv (сжатый вывод или нет, не имеет значения)?

Я читаю их в искровые датафреймыно затем я застрял на том, как объединить pyspark Dataframes.

Ниже мой код, который запускает цикл и хочет добавить Dataframe для каждого запуска цикла:

        schema=StructType([])
        result = spark.createDataFrame(sc.emptyRDD(), schema)
        for day in range(1,31):
            day_str = str(day) if day>=10 else "0"+str(day)
            print 'Ingesting %s' % day_str
            df = spark.read.format("csv").option("header", "false").option("delimiter", "|").option("inferSchema", "true").load("s3a://key/201811%s" % (day_str))
            result = result.unionAll(df)

        result.write.save("s3a://key/my_result.csv", format='csv')

Это дает мне ошибку AnalysisException: u"Union can only be performed on tables with the same number of columns, but the first table has 0 columns and the second table has 1 columns;;\n'Union\n:- LogicalRDD\n+- Relation[_c0#75] csv\n".Может ли кто-нибудь помочь мне, как я могу продолжить?

1 Ответ

0 голосов
/ 23 февраля 2019

Это сработало для меня:

result=spark.createDataFrame(sc.emptyRDD(), schema_mw)

for day in range(1,31):
    day_str = str(day) if day>=10 else "0"+str(day)
    print 'Ingesting %s' % day_str

    df = spark.read.format("csv").option("header", "false").option("delimiter", ",").schema(schema_mw).load("s3a://bucket/201811%s" % (day_str))

    if result:
        result = result.union(df)
    else:
        result = df
result.repartition(1).write.save("s3a://bucket/key-Compiled", format='csv', header=False)

Это работает, однако, когда я пытаюсь загрузить заголовок как true на последнем шаге перераспределения, заголовок сохраняется как строка.Я не уверен, как добавить эти заголовки как заголовок, а не как строку.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...