Могу ли я получить метаданные файлов, читаемых Spark - PullRequest
1 голос
/ 11 июля 2020

Предположим, у нас есть 2 файла, файл №1 создан в 12:55, а файл №2 создан в 12:58. Читая эти два файла, я хочу добавить новый столбец «creation_time». Строки, принадлежащие файлу № 1, имеют 12:55 в столбце «creation_time», а строки, принадлежащие файлу № 2, имеют 12:58 в «creation_time».

new_data = spark.read.option("header", "true").csv("s3://bucket7838-1/input")

Я использую приведенный выше фрагмент кода для чтения файлы во "входном" каталоге.

Ответы [ 2 ]

3 голосов
/ 11 июля 2020

Используйте функцию input_file_name(), чтобы получить имя файла, а затем используйте hdfs file api, чтобы получить временную метку файла, наконец, соедините оба фрейма данных на filename.

Example:

from pyspark.sql.types import *
from pyspark.sql.functions import *
URI           = sc._gateway.jvm.java.net.URI
Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration

fs = FileSystem.get(URI("hdfs://<namenode_address>:8020"), Configuration())

status = fs.listStatus(Path('<hdfs_directory>'))

filestatus_df=spark.createDataFrame([[str(i.getPath()),i.getModificationTime()/1000] for i in status],["filename","modified_time"]).\
withColumn("modified_time",to_timestamp(col("modified_time")))

input_df=spark.read.csv("<hdfs_directory>").\
withColumn("filename",input_file_name())

#join both dataframes on filename to get filetimestamp
df=input_df.join(filestatus_df,['filename'],"left")
2 голосов
/ 11 июля 2020

Вот шаги

  1. Используйте sparkcontext.wholeTextFiles («/ путь / к / папке / содержащий / все / файлы»)
  2. Вышеупомянутое возвращает RDD, где ключ путь к файлу, а значение - это содержимое файла
  3. rdd.map (lambda x: x [1]) - это дает вам rdd только с содержимым файла
  4. rdd. map (lambda x: customeFunctionToProcessFileContent (x))
  5. , поскольку функция карты работает параллельно, любые выполняемые вами операции будут быстрее и непоследовательными - пока ваши задачи не зависят друг от друга, что является основные критерии параллелизма
import os
import time

import pyspark
from pyspark.sql.functions import udf
from pyspark.sql.types import *

# reading all the files to create PairRDD 
input_rdd = sc.wholeTextFiles("file:///home/user/datatest/*",2)

#convert RDD to DF

input_df=spark.createDataFrame(input_rdd)

input_df.show(truncate=False)
'''
+---------------------------------------+------------+
|_1                                     |_2          |
+---------------------------------------+------------+
|file:/home/user/datatest/test.txt      |1,2,3  1,2,3|
|file:/home/user/datatest/test.txt1     |4,5,6  6,7,6|
+---------------------------------------+------------+
'''
input_df.select("_2").take(2)
#[Row(_2=u'1,2,3\n1,2,3\n'), Row(_2=u'4,5,6\n6,7,6\n')]


# function to get a creation time of a file
def time_convesion(filename):
    return time.ctime(os.path.getmtime(filename.split(":")[1]))

#udf registration
time_convesion_udf = udf(time_convesion, StringType())

#udf apply over the DF
final_df = input_df.withColumn("created_time", time_convesion_udf(input_df['_1']))

final_df.show(2,truncate=False)
'''
+---------------------------------------+------------+------------------------+
|_1                                     |_2          |created_time            |
+---------------------------------------+------------+------------------------+
|file:/home/user/datatest/test.txt      |1,2,3  1,2,3|Sat Jul 11 18:31:03 2020|
|file:/home/user/datatest/test.txt1     |4,5,6  6,7,6|Sat Jul 11 18:32:43 2020|
+---------------------------------------+------------+------------------------+
'''
# proceed with the next steps for the implementation

Вышеуказанное работает с разделом по умолчанию. Таким образом, вы можете не получить количество входных файлов, равное количеству выходных файлов (поскольку выход - это количество разделов).

Вы можете повторно разбить RDD на основе количества или любого другого уникального значения на основе ваших данных, поэтому вы в конечном итоге количество выходных файлов равно количеству входных. Этот подход будет иметь только параллелизм, но не даст производительности, достигнутой при оптимальном количестве разделов

...