spark python считывает несколько CSV-файлов в dataframe - PullRequest
0 голосов
/ 23 января 2020

У меня есть несколько CSV-файлов на диске данных. Я могу подключиться к файлу данных и даже перечислить файлы. Но мне нужно поместить эти файлы в один фрейм данных, чтобы я мог загрузить этот фрейм в SQL. Загрузка до SQL тоже не проблема. Проблема в том, что только содержимое последнего файла в папке данных читается и записывается в SQL (и, следовательно, также в фрейм данных). Возможно, потому что датафрейм каждый раз перезаписывается. Но я не знаю, как добавлять данные в фрейм данных в каждом цикле. Вот код, который я использую:

    for file in dayfolders.collect():
      filename = file.name
      pathname = file.path
      tablename = "Obelix" 
      if filename.endswith(".csv"): 
          df = spark.read.format("csv")\
          .option("inferschema", "true")\
          .option("header","true")\
          .load(file.path)
          continue
      else:
          continue 

Если я поставлю оператор print (имя файла) сразу после оператора for, я смогу его увидеть. перебирает три файла. Все файлы отдельно обрабатываются просто отлично

Ответы [ 2 ]

0 голосов
/ 24 января 2020

Вам не нужно указывать ForL oop. Вы можете указать «dayfolders / *. Csv» при загрузке, и он загрузит все файлы напрямую и объединит их в фрейм данных.

f = spark.read.format("csv")\
          .option("inferschema", "true")\
          .option("header","true")\
          .load(dayfolders/*.csv")
0 голосов
/ 24 января 2020

Вы можете импортировать, используя список файлов. Они будут автоматически объединены для вас.

csv_import = sqlContext.read\
  .format('csv')\
  .options(sep = ',', header='true', inferSchema='true')\
  .load([file.path for file in dayfolders.collect()])\
  .createOrReplaceTempView(<temporary table name>)

Если вы настроены на чтение в файлах как отдельные кадры данных, вам нужно объединить каждый кадр данных:

for ind, file in enumerate(dayfolders.collect()):
  if ind == 0:
    df = spark.read.format("csv")\
      .option("inferschema", "true")\
      .option("header","true")\
      .load(file.path)
  else:
    df = df.union(spark.read.format("csv")\
      .option("inferschema", "true")\
      .option("header","true")\
      .load(file.path))

I не рекомендую вам это делать. Просто используйте первый метод.

...