Использование PySpark для эффективного объединения множества небольших файлов CSV (130 000 с 2 столбцами в каждом) в один большой кадр - PullRequest
0 голосов
/ 18 февраля 2020

Это еще одно продолжение предыдущего вопроса, который я опубликовал Как я могу эффективно объединить эти много CSV-файлов (около 130 000), используя PySpark, в один большой набор данных?

У меня есть следующее набор данных https://fred.stlouisfed.org/categories/32263/downloaddata/INTRNTL_csv_2.zip

В нем содержится список файлов (около 130 000). В главном каталоге перечислены их подкаталоги, поэтому первая ячейка может быть A / AAAAA, а файл будет находиться по адресу /data/A/AAAAA.csv

Все файлы имеют В аналогичном формате первый столбец называется DATE, а второй столбец - серией, которая называется VALUE. Итак, во-первых, имя столбца VALUE необходимо переименовать в имя файла в каждом файле CSV . Во-вторых, кадры должны быть полностью внешними, соединенными друг с другом с ДАТА в качестве основного индекса. В-третьих, я хочу сохранить файл и иметь возможность загружать и манипулировать им. Файл должен быть около N строк (число дат) примерно 130 001.

Я пытаюсь выполнить полное внешнее объединение всех файлов в один кадр данных, ранее я пытался pandas, но при попытке исчерпал память чтобы составить список файлов, и кто-то порекомендовал мне вместо этого использовать PySpark.

В предыдущем посте мне сказали, что я могу сделать это:

df = spark.read.csv("/kaggle/input/bf-csv-2/BF_csv_2/data/**/*.csv", "date DATE, value DOUBLE")

Но все столбцы именованное значение, и фрейм просто становится двумя столбцами, первый столбец - ДАТА, а второй - ЗНАЧЕНИЕ, он загружается довольно быстро, около 38 секунд и около 3,8 миллиона значений на 2 столбца, поэтому я знаю, что он не выполняет полное внешнее соединение, он добавляет файлы по строкам.

Итак, я попробовал следующий код:

import pandas as pd
import time
import os

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('spark-dataframe-demo').getOrCreate()
from pyspark.sql import *
from pyspark.sql.functions import col
from pyspark.sql import DataFrame
from pyspark.sql.types import *

filelist = pd.read_excel("/kaggle/input/list/BF_csv_2.xlsx") #list of filenames

firstname = min(filelist.File)
length = len(filelist.File)

dff = spark.read.csv(f"/kaggle/input/bf-csv-2/BF_csv_2/data/" + firstname, inferSchema = True, header = True).withColumnRenamed("VALUE",firstname) #read file and changes name of column to filename

for row in filelist.File.items():
    if row == firstname:
        continue

    print (row[1],length,end='', flush=True)
    df = spark.read.csv(f"/kaggle/input/bf-csv-2/BF_csv_2/data/" + row[1], inferSchema = True, header = True).withColumnRenamed("VALUE",row[1][:-4])
    #df = df.select(col("DATE").alias("DATE"),col("VALUE").alias(row[1][:-4]))

    dff = dff.join(df, ['DATE'], how='full')

    length -= 1

dff.write.save('/kaggle/working/whatever', format='parquet', mode='overwrite')

Итак, чтобы проверить это, я пытаюсь загрузить функцию df.show () после того, как 3 столбца слились и это довольно быстро. Но когда я пробую около 25 столбцов, это занимает около 2 минут. Когда я пробую 500 столбцов, это почти невозможно.

Не думаю, что я делаю это правильно. Форматирование и все правильно. Но почему это так долго? Как правильно использовать PySpark? Есть ли лучшие библиотеки для достижения того, что мне нужно?

1 Ответ

3 голосов
/ 19 февраля 2020

Spark не делает ничего волшебного по сравнению с другими программами. Сила искры заключается в параллельной обработке. В большинстве случаев это означает, что вы можете использовать несколько машин для выполнения работы. Если вы используете spark локально, у вас могут быть те же проблемы, что и при использовании pandas.

При этом, возможно, у вас есть возможность запустить его локально с помощью Spark, поскольку при определенных условиях он может пролиться на диск и не должен иметь все в памяти.

I ' Я не стих в PySpark, но подход, который я выбрал бы:

  1. загрузка всех файлов, используя, как вы /kaggle/input/bf-csv-2/BF_csv_2/data/**/*.csv
  2. Используйте функцию from pyspark.sql.functions import input_file_name, которая позволяет вам чтобы получить путь для каждой записи в вашем DF (df.select("date", "value", input_file_name().as("filename")) или подобном)
  3. Разобрать путь в формат, который я хотел бы иметь в виде столбца (например, извлечь имя файла)
  4. схема должна выглядеть так: date, value, filename на этом шаге
  5. использовать PySpark, эквивалентный df.groupBy("date").pivot("filename").agg(first("value")). Примечание: я использовал first(), потому что я думаю, что у вас есть 1 или 0 возможных записей
  6. Также попробуйте: установить количество разделов равным количеству дат, которые вы получили
  7. Если вы хотите вывод как один файл, не забудьте repartition(1) до df.write. Этот шаг может быть проблематичным c в зависимости от размера данных. Вам не нужно делать это, если вы планируете продолжать использовать Spark для своей работы, поскольку вы можете загрузить данные, используя тот же подход, что и в шаге 1 (/new_result_data/*.csv)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...