Вот шаги
- Используйте sparkcontext.wholeTextFiles («/ путь / к / папке / содержащий / все / файлы»)
- Вышеупомянутое возвращает RDD, где ключ путь к файлу, а значение - это содержимое файла
- rdd.map (lambda x: x [1]) - это дает вам rdd только с содержимым файла
- rdd. map (lambda x: customeFunctionToProcessFileContent (x))
- , поскольку функция карты работает параллельно, любые выполняемые вами операции будут быстрее и непоследовательными - пока ваши задачи не зависят друг от друга, что является основные критерии параллелизма
import os
import time
import pyspark
from pyspark.sql.functions import udf
from pyspark.sql.types import *
# reading all the files to create PairRDD
input_rdd = sc.wholeTextFiles("file:///home/user/datatest/*",2)
#convert RDD to DF
input_df=spark.createDataFrame(input_rdd)
input_df.show(truncate=False)
'''
+---------------------------------------+------------+
|_1 |_2 |
+---------------------------------------+------------+
|file:/home/user/datatest/test.txt |1,2,3 1,2,3|
|file:/home/user/datatest/test.txt1 |4,5,6 6,7,6|
+---------------------------------------+------------+
'''
input_df.select("_2").take(2)
#[Row(_2=u'1,2,3\n1,2,3\n'), Row(_2=u'4,5,6\n6,7,6\n')]
# function to get a creation time of a file
def time_convesion(filename):
return time.ctime(os.path.getmtime(filename.split(":")[1]))
#udf registration
time_convesion_udf = udf(time_convesion, StringType())
#udf apply over the DF
final_df = input_df.withColumn("created_time", time_convesion_udf(input_df['_1']))
final_df.show(2,truncate=False)
'''
+---------------------------------------+------------+------------------------+
|_1 |_2 |created_time |
+---------------------------------------+------------+------------------------+
|file:/home/user/datatest/test.txt |1,2,3 1,2,3|Sat Jul 11 18:31:03 2020|
|file:/home/user/datatest/test.txt1 |4,5,6 6,7,6|Sat Jul 11 18:32:43 2020|
+---------------------------------------+------------+------------------------+
'''
# proceed with the next steps for the implementation
Вышеуказанное работает с разделом по умолчанию. Таким образом, вы можете не получить количество входных файлов, равное количеству выходных файлов (поскольку выход - это количество разделов).
Вы можете повторно разбить RDD на основе количества или любого другого уникального значения на основе ваших данных, поэтому вы в конечном итоге количество выходных файлов равно количеству входных. Этот подход будет иметь только параллелизм, но не даст производительности, достигнутой при оптимальном количестве разделов