Преобразование файлов CSV из нескольких каталогов в паркет в PySpark - PullRequest
1 голос
/ 30 мая 2020

У меня есть файлы CSV из нескольких путей, которые не являются родительскими каталогами в корзине s3. Все таблицы имеют одинаковые ключи разделов.

каталог s3:

table_name_1/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.csv
table_name_2/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.csv
...

Мне нужно преобразовать эти файлы csv в файлы parquet и сохранить их в другом ведре s3 с такой же структурой каталогов.

каталог другого s3:

table_name_1/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.parquet
table_name_2/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.parquet
...

У меня есть решение: перебирать ведро s3 и находить файл CSV, преобразовывать его в паркет и сохранять на другой путь S3. Я считаю, что этот способ неэффективен, потому что у меня есть al oop, и я конвертировал один файл в один файл.

Я хочу использовать библиотеку Spark для повышения эффективности. Затем я попробовал:

spark.read.csv('s3n://bucket_name/table_name_1/').write.partitionBy('partition_key_1', 'partition_key_2').parquet('s3n://another_bucket/table_name_1')

Этот способ хорошо работает для каждой таблицы, но для большей его оптимизации я хочу взять table_name в качестве параметра, например:

TABLE_NAMES = [table_name_1, table_name_2, ...]
spark.read.csv('s3n://bucket_name/{*TABLE_NAMES}/').write.partitionBy('partition_key_1', 'partition_key_2').parquet('s3n://another_bucket/{*TABLE_NAMES}')

Спасибо

1 Ответ

1 голос
/ 30 мая 2020

Упомянутый вопрос предлагает решения для одновременного чтения нескольких файлов. Метод spark.read.csv(...) принимает один или несколько путей, как показано здесь . Для чтения файлов можно применить тот же лог c. Хотя, когда дело доходит до написания, Spark объединит все заданные наборы данных / пути в один Dataframe. Поэтому невозможно сгенерировать из одного единственного кадра данных несколько кадров данных без применения сначала настраиваемого logi c. Итак, в заключение, не существует такого метода для извлечения исходного фрейма данных непосредственно в несколько каталогов, то есть df.write.csv(*TABLE_NAMES).

Хорошей новостью является то, что Spark предоставляет специальную функцию, а именно input_file_name () , которая возвращает путь к файлу текущей записи. Вы можете использовать его в сочетании с TABLE_NAMES для фильтрации по имени таблицы.

Вот одно из возможных непроверенных решений PySpark:

from pyspark.sql.functions import input_file_name 

TABLE_NAMES = [table_name_1, table_name_2, ...]

source_path = "s3n://bucket_name/"
input_paths = [f"{source_path}/{t}" for t in TABLE_NAMES]

all_df = spark.read.csv(*input_paths) \
              .withColumn("file_name", input_file_name()) \
              .cache()

dest_path = "s3n://another_bucket/"

def write_table(table_name: string) -> None:
   all_df.where(all_df["file_name"].contains(table_name))
     .write
     .partitionBy('partition_key_1','partition_key_2')
     .parquet(f"{dest_path}/{table_name}")

for t in TABLE_NAMES:
   write_table(t)

Пояснение:

  • Мы генерируем и сохраняем входные пути в input_paths. Это создаст такие пути, как: s3n://bucket_name/table1, s3n://bucket_name/table2 ... s3n://bucket_name/tableN.

  • Затем мы загружаем все пути в один фрейм данных, в который добавляем новый столбец с именем file_name, он будет содержать путь каждый ряд. Обратите внимание, что здесь мы также используем cache, это важно, поскольку в следующем коде у нас есть несколько действий len(TABLE_NAMES). Использование кеша не позволит нам загружать источник данных снова и снова.

  • Затем мы создаем write_table, который отвечает за сохранение данных для данной таблицы. Следующим шагом является фильтрация на основе имени таблицы с использованием all_df["file_name"].contains(table_name), при этом будут возвращены только записи, содержащие значение table_name в столбце file_name. Наконец, мы сохраняем отфильтрованные данные, как и вы.

  • На последнем шаге мы вызываем write_table для каждого элемента TABLE_NAMES.

Ссылки по теме

Как импортировать несколько файлов csv за одну загрузку?

Получить путь к файлу HDFS в PySpark для файлов в формате файла последовательности

...