Как я могу эффективно объединить эти много CSV-файлов (около 130 000) с помощью PySpark в один большой набор данных? - PullRequest
2 голосов
/ 17 февраля 2020

Я опубликовал этот вопрос ранее и получил несколько советов по использованию PySpark.

Как эффективно объединить этот большой набор данных в один большой массив данных?

ZIP-файл (https://fred.stlouisfed.org/categories/32263/downloaddata/INTRNTL_csv_2.zip) содержит папку с именем data, содержащую около 130 000 CSV-файлов. Я хочу объединить их все в один кадр данных. У меня 16 ГБ ОЗУ, и я продолжаю исчерпывать ОЗУ, когда бью первые несколько сотен файлов. Общий размер файлов составляет всего около 300-400 МБ данных.

Если вы откроете какой-либо из файлов CSV, вы увидите, что все они имеют одинаковый формат, первый столбец для дат, и второй столбец для ряда данных.

Так что теперь вместо этого я использую PySpark, однако я не знаю, какой самый эффективный способ соединить все файлы, с pandas фреймами данных, я бы просто констатировал список отдельных фреймов, как это, потому что я хочу их нужно объединить по датам:

bigframe = pd.concat(listofframes,join='outer', axis=0)

Но, как я уже говорил, этот метод не работает, так как у меня очень быстро заканчивается ОЗУ.

Что было бы лучшим способом сделать что-то похожее с использованием PySpark?

Пока у меня есть это (кстати, список файлов ниже - это просто список файлов, которые я хочу вытащить, вы можете проигнорировать это)


import os

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark-dataframe-demo').getOrCreate()
from pyspark.sql import *
from pyspark.sql.functions import col

from functools import reduce
from pyspark.sql import DataFrame

listdf = []

for subdir, dirs, files in os.walk("/kaggle/input/filelist/"):
    for file in files:
        path = os.path.join(subdir,file)
        print(file)
        filelist = pd.read_excel("/kaggle/input/filelist/" + file)

        for row in filelist.File.items():
            df = spark.read.csv(f"/kaggle/input/master/{file[:-5]}/{file[:-5]}/data/" + row[1], inferSchema = True, header = True)
            df = df.select(col("DATE").alias("DATE"),col("VALUE").alias(row[1][:-4]))
            df.show(3)
            listdf.append(df)

Я останавливаю код после того, как он добавляется как 10 кадров, но когда я пробую код, приведенный ниже, у него просто один столбец данных, он не сливается должным образом.

bigframe = reduce(DataFrame.join(listdf, ['DATE'], how='full'))

Но я остался только с двумя столбцами данных, датой и первым элементом в списке искровых кадров.

Как правильно объединить все в один кадр? Я хочу, чтобы даты были индексом вещи, с которым сливаются другие столбцы. Значение, если один кадр имеет:

Date        TimeSeries1
1 Jan 2012  12345
2 Jan 2012  23456

, а другой имеет

Date        TimeSeries2
1 Jan 2012  5678
3 Jan 2012  8910

Я хочу, чтобы конечный продукт был

Date        TimeSeries1 TimeSeries2
1 Jan 2012  12345       5678
2 Jan 2012  23456
3 Jan 2012              8910

Кроме того, чтобы определить столбцы имена должны быть заменены на имя файла.

Ответы [ 2 ]

0 голосов
/ 04 марта 2020

Здесь много чего происходит, но если я смогу объяснить это необходимостью объединить данные из файлов CSV 130k в один DF и записать имя для каждого файла, вы можете сделать это следующим образом.

from  pyspark.sql.functions import input_file_name
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

customSchema = StructType([ \
StructField("asset_id", StringType(), True), \
StructField("price_date", StringType(), True), \
etc., 
StructField("close_price", StringType(), True), \
StructField("filename", StringType(), True)])

fullpath = 'mnt/INTRNTL_csv_2/data/??/*.csv'

df = spark.read.format("csv") \
   .option("header", "false") \
   .option("sep","|") \
   .schema(customSchema) \
   .load(fullPath) \
   .withColumn("filename", input_file_name())

Примечание: самая первая строка кода и самая последняя строка кода используются для получения имен файлов. Также обратите внимание на подстановочные знаки; '?' для одного символа (либо буквы, либо цифры), а '*' - для любого количества символов (любая комбинация букв и цифр).

0 голосов
/ 17 февраля 2020

spark может считывать данные из нескольких файлов по умолчанию, если они содержат одну и ту же схему.

Чтобы обрабатывать каждую временную серию отдельно, вы можете сгруппировать по фрейму данных по имени файла и использовать pandas udf для обработки каждой группы ,

import glob as g
import pyspark.sql.functions as F

@F.pandas_udf("date Date, value DECIMAL(38,4)", F.PandasUDFType.GROUPED_MAP)
def transform(pdf):
  # pdf will be a pandas datafrmme for each timeseries
  # apply timeseries computations here and return a new dataframe
  # with aggregated values
  return pdf

paths = g.glob("./INTRNTL_csv_2/data/**/*.csv", recursive=True)

df = spark.read.csv(paths, header=False, schema="date DATE, value DECIMAL(38,4)")


res = df.withColumn('name', F.input_file_name())
res = res.groupBy('name').apply(transform)
res.show()

...