PySpark Чтение нескольких файлов параллельно - PullRequest
0 голосов
/ 05 сентября 2018

В моем проекте есть следующее требование, и мы пытаемся использовать PySpark для обработки данных.

Мы использовали для получения данных датчиков в виде файлов Parquet для каждого транспортного средства и один файл для каждого транспортного средства. В файле много датчиков, но структурированные данные в формате Parquet. Средний размер файла составляет 200 МБ на файл.

Предположим, я получил файлы, как показано ниже, в одном пакете и готов к обработке.

Дата изменения размера файла поезда

X1 210 МБ 05-Sep-18 00:10

X1 280 МБ 05-Sep-18 17:10

Y1 220 МБ 05-Sep-18 04:10

Y1 241MB 05.09.09 18:10

В конце обработки мне нужно получить один агрегированный файл .csv из каждого исходного файла или один главный файл с агрегированными данными для всех этих транспортных средств.

Мне известно, что размер блока HDFS по умолчанию составляет 128 МБ, и каждый файл будет разделен на 2 блока. Могу ли я узнать, как я могу выполнить это требование с помощью PySpark? Можно ли обрабатывать все эти файлы параллельно?

Пожалуйста, дайте мне знать ваши мысли

Ответы [ 2 ]

0 голосов
/ 18 октября 2018

У меня была похожая проблема, и кажется, что я нашел способ: 1. Получить список файлов 2. Распараллелить этот список (распределить по всем узлам) 3. напишите функцию, которая читает содержимое всех файлов из части большого списка, которая была распространена на узел 4. запустите его с mapPartition, затем соберите результат в виде списка, каждый элемент представляет собой собранное содержимое каждого файла. Файлы FOT, хранящиеся в файлах AWS s3 и json:

def read_files_from_list(file_list):
#reads files from  list
#returns content as list of strings, 1 json per string ['{}','{}',...]
   out=[]
   for x in file_list:
      content = sp.check_output([ 'aws', 's3', 'cp', x, '-']) # content of the file. x here is a full path: 's3://bucket/folder/1.json'
      out.append(content)   
   return out #content of all files from the file_list as list of strings, 1 json per string ['{}','{}',...]


file_list=['f1.json','f2.json',...]
    ps3="s3://bucket/folder/"
    full_path_chunk=[ps3 + f for f in file_list] #makes list  of strings, with full path for each file
    n_parts = 100
    rdd1 = sc.parallelize(full_path_chunk, n_parts ) #distribute files among nodes
    list_of_json_strings = rdd1.mapPartitions(read_files_from_list).collect()

Затем, при необходимости, вы можете создать искровой фрейм данных следующим образом:

rdd2=sc.parallelize(list_of_json_strings) #this is a trick! via http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets
df_spark=sqlContext.read.json(rdd2)

Функция read_files_from_list является лишь примером, ее следует изменить, чтобы читать файлы из hdfs с помощью инструментов python. Надеюсь, это поможет:)

0 голосов
/ 05 сентября 2018

Вы можете поместить все входные файлы в одну и ту же директорию, а затем передать путь к директории в spark. Вы также можете использовать сглаживание как /data_dir/*.csv.

...