Как сохранить каждую строку DataFrame в виде файла HDFS с помощью pyspark - PullRequest
0 голосов
/ 19 февраля 2020

У меня есть файл ctrl A с разделителями и имеет следующий заголовок:

имя файла ',' file_metadata ',' data_content ',' status ',' errortype ',' error_message '

Мне нужно вывести отдельные файлы в hdf для каждой записи файла в такие каталоги, как - basepath_errortype_filename / file. json, и содержимое файла будет столбцом data_content.

Отображение примера данных:

>>> ff_df = ff_rdd.toDF(['file_name','file_metadata','data_content','status','error_type','error_message'])
>>> ff_df .show()
+--------------+-------------+--------------------+------+-------------+--------------------+
|     file_name|file_metadata|        data_content|status|   error_type|       error_message|
+--------------+-------------+--------------------+------+-------------+--------------------+
|test_file.json|     metadata|{ "fruit": "Apple...|FAILED| INVALID_JSON|     could not parse|
|demo_file.json|     metadata|{ "fruit": "Apple...|FAILED|MISSING_RULES|No matching rules...|
+--------------+-------------+--------------------+------+-------------+--------------------+

Теперь мне нужны эти две строки как два файла в формате hdf, в папках / tmp / INVALID_JSON_test_file и / tmp / MISSING_RULES_demo_file. Я написал следующий код Pyspark, но я не получаю желаемого результата. Пожалуйста, помогите

def write_file(line)
 tokens=line.split("\x01")
 file_name=tokens[0]
 error_type=tokens[4]
 content=tokens[2]
 #define path to saved file
 file_name = %s + "/" + 
 directory_name = basePath"/"error_type"/"file_name
 return directory_name

# get the file content
ff_rdd = sc.textFile("/tmp/pyspark1.txt").map(lambda line: line.split("\x01"))
ff_df = ff_rdd.toDF(['file_name','file_metadata','data_content','status','error_type','error_message'])
content_df = ff_df.select("data_content")

file_path = sc.textFile("/tmp/pyspark1.txt").map(lambda line: write_file(line))
content_df.rdd.saveAsTextFile("file_path")```


1 Ответ

0 голосов
/ 20 февраля 2020

Пример ввода:

+--------------+-------------+-------------------+------+-------------+-----------------+
|     file_name|file_metadata|       data_content|status|   error_type|    error_message|
+--------------+-------------+-------------------+------+-------------+-----------------+
|test_file.json|     metadata|{ "fruit": "Apple"}|FAILED| INVALID_JSON|  could not parse|
|demo_file.json|     metadata|   { "fruit": "Ab"}|FAILED|MISSING_RULES|No matching rules|
+--------------+-------------+-------------------+------+-------------+-----------------+

Сначала мы объединим столбец error_type и столбец file_name (только имя файла, исключая расширение) для создания newColumn.

final_df = df_new.withColumn("newColumn",concat(col("error_type"),lit("_"),split("file_name","\.")[0]))

При запуске df_new.show(truncate=false) мы увидим пример вывода в следующем виде:

+--------------+-------------+-------------------+------+-------------+-----------------+-----------------------+
|file_name     |file_metadata|data_content       |status|error_type   |error_message    |newColumn              |
+--------------+-------------+-------------------+------+-------------+-----------------+-----------------------+
|test_file.json|metadata     |{ "fruit": "Apple"}|FAILED|INVALID_JSON |could not parse  |INVALID_JSON_test_file |
|demo_file.json|metadata     |{ "fruit": "Ab"}   |FAILED|MISSING_RULES|No matching rules|MISSING_RULES_demo_file|
+--------------+-------------+-------------------+------+-------------+-----------------+-----------------------+

Чтобы получить структуру каталогов в требуемом формате, например: Base_directory / INVALID_JSON_test_file, во время записи нам придется разбить final_df на основе созданного newColumn.

Мы можем написать, используя следующее:

final_df.select("data_content","newColumn").write.partitionBy("newColumn").save(FilePath)

По умолчанию будет записан файл паркета. Я не думаю, что будет возможно записать вывод в виде текстового файла, так как он не принимает несколько столбцов, нам потребуется записать newColumn вместе с data_content, потому что мы разделяем фрейм данных на основа newColumn.

...