Пример ввода:
+--------------+-------------+-------------------+------+-------------+-----------------+
| file_name|file_metadata| data_content|status| error_type| error_message|
+--------------+-------------+-------------------+------+-------------+-----------------+
|test_file.json| metadata|{ "fruit": "Apple"}|FAILED| INVALID_JSON| could not parse|
|demo_file.json| metadata| { "fruit": "Ab"}|FAILED|MISSING_RULES|No matching rules|
+--------------+-------------+-------------------+------+-------------+-----------------+
Сначала мы объединим столбец error_type
и столбец file_name
(только имя файла, исключая расширение) для создания newColumn
.
final_df = df_new.withColumn("newColumn",concat(col("error_type"),lit("_"),split("file_name","\.")[0]))
При запуске df_new.show(truncate=false)
мы увидим пример вывода в следующем виде:
+--------------+-------------+-------------------+------+-------------+-----------------+-----------------------+
|file_name |file_metadata|data_content |status|error_type |error_message |newColumn |
+--------------+-------------+-------------------+------+-------------+-----------------+-----------------------+
|test_file.json|metadata |{ "fruit": "Apple"}|FAILED|INVALID_JSON |could not parse |INVALID_JSON_test_file |
|demo_file.json|metadata |{ "fruit": "Ab"} |FAILED|MISSING_RULES|No matching rules|MISSING_RULES_demo_file|
+--------------+-------------+-------------------+------+-------------+-----------------+-----------------------+
Чтобы получить структуру каталогов в требуемом формате, например: Base_directory / INVALID_JSON_test_file, во время записи нам придется разбить final_df на основе созданного newColumn
.
Мы можем написать, используя следующее:
final_df.select("data_content","newColumn").write.partitionBy("newColumn").save(FilePath)
По умолчанию будет записан файл паркета. Я не думаю, что будет возможно записать вывод в виде текстового файла, так как он не принимает несколько столбцов, нам потребуется записать newColumn
вместе с data_content
, потому что мы разделяем фрейм данных на основа newColumn
.