Это еще одно продолжение предыдущего вопроса, который я опубликовал Как я могу эффективно объединить эти много CSV-файлов (около 130 000), используя PySpark, в один большой набор данных?
У меня есть следующее набор данных https://fred.stlouisfed.org/categories/32263/downloaddata/INTRNTL_csv_2.zip
В нем содержится список файлов (около 130 000). В главном каталоге перечислены их подкаталоги, поэтому первая ячейка может быть A / AAAAA, а файл будет находиться по адресу /data/A/AAAAA.csv
Все файлы имеют В аналогичном формате первый столбец называется DATE, а второй столбец - серией, которая называется VALUE. Итак, во-первых, имя столбца VALUE необходимо переименовать в имя файла в каждом файле CSV . Во-вторых, кадры должны быть полностью внешними, соединенными друг с другом с ДАТА в качестве основного индекса. В-третьих, я хочу сохранить файл и иметь возможность загружать и манипулировать им. Файл должен быть около N строк (число дат) примерно 130 001.
Я пытаюсь выполнить полное внешнее объединение всех файлов в один кадр данных, ранее я пытался pandas, но при попытке исчерпал память чтобы составить список файлов, и кто-то порекомендовал мне вместо этого использовать PySpark.
В предыдущем посте мне сказали, что я могу сделать это:
df = spark.read.csv("/kaggle/input/bf-csv-2/BF_csv_2/data/**/*.csv", "date DATE, value DOUBLE")
Но все столбцы именованное значение, и фрейм просто становится двумя столбцами, первый столбец - ДАТА, а второй - ЗНАЧЕНИЕ, он загружается довольно быстро, около 38 секунд и около 3,8 миллиона значений на 2 столбца, поэтому я знаю, что он не выполняет полное внешнее соединение, он добавляет файлы по строкам.
Итак, я попробовал следующий код:
import pandas as pd
import time
import os
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark-dataframe-demo').getOrCreate()
from pyspark.sql import *
from pyspark.sql.functions import col
from pyspark.sql import DataFrame
from pyspark.sql.types import *
filelist = pd.read_excel("/kaggle/input/list/BF_csv_2.xlsx") #list of filenames
firstname = min(filelist.File)
length = len(filelist.File)
dff = spark.read.csv(f"/kaggle/input/bf-csv-2/BF_csv_2/data/" + firstname, inferSchema = True, header = True).withColumnRenamed("VALUE",firstname) #read file and changes name of column to filename
for row in filelist.File.items():
if row == firstname:
continue
print (row[1],length,end='', flush=True)
df = spark.read.csv(f"/kaggle/input/bf-csv-2/BF_csv_2/data/" + row[1], inferSchema = True, header = True).withColumnRenamed("VALUE",row[1][:-4])
#df = df.select(col("DATE").alias("DATE"),col("VALUE").alias(row[1][:-4]))
dff = dff.join(df, ['DATE'], how='full')
length -= 1
dff.write.save('/kaggle/working/whatever', format='parquet', mode='overwrite')
Итак, чтобы проверить это, я пытаюсь загрузить функцию df.show () после того, как 3 столбца слились и это довольно быстро. Но когда я пробую около 25 столбцов, это занимает около 2 минут. Когда я пробую 500 столбцов, это почти невозможно.
Не думаю, что я делаю это правильно. Форматирование и все правильно. Но почему это так долго? Как правильно использовать PySpark? Есть ли лучшие библиотеки для достижения того, что мне нужно?