Spark: загрузка нескольких файлов, выполнение одной и той же операции и объединение в один фрейм - PullRequest
0 голосов
/ 19 апреля 2020

У меня много маленьких, индивидуальных .txt файлов. Для каждого из этих файлов у меня есть несколько строк, разделенных пробелом на 2 столбца: start_time и end_time (число с плавающей точкой).

Я хотел бы:

  • загрузить все TXT-файлы
  • для каждой строки вычисляют новый столбец, который содержит (end_time - start_time)
  • для каждой строки, добавляют новый столбец с именем файла
  • In В конце я хочу получить один dataFrame с этой схемой:
+------------+--------------+------------+------------+
|  file_name |   start_time |   end_time |   duration |
+------------+--------------+------------+------------+

Я знаю, что могу просто сделать al oop для каждого файла и каждой строки и добавить одну единственную строку в время для кадра данных, но я хотел бы знать, есть ли более быстрый способ сделать это.
Меня не интересует порядок, в котором все делается, но скорость конечного результата. Я вижу, что существующие функции, такие как textFile () и wholeTextFiles () , представлены в SparkContext, но я не мог понять, как их использовать, чтобы выполнить то, что я хочу.

Любые указания или рекомендации приветствуются!

(Извините за мой бедный Энгли sh)

Обновление:

Спасибо @ Шу за помощь, это последний код, который я использую для решения моей проблемы

from pyspark.sql.functions import split, reverse, input_file_name

original_schema = [StructField("Start", FloatType(), True),
                    StructField("End", FloatType(), True)]

data_structure = StructType(original_schema)

df = self.spark_session.read.\
    csv(path=PATH_FILES+'\\*.txt', header=False, schema=data_structure, sep='\t').\
    withColumn("Filename", reverse(split(input_file_name(), "/")).getItem(0) ).\
    withColumn("duration", col("End") - col("Start"))

df.show(20, False)

Ответы [ 2 ]

1 голос
/ 19 апреля 2020

Считайте файл, используя spark.read.csv() и Если ваши столбцы разделены space, используйте .option("delimiter"," ").

  • используйте input_file_name функцию чтобы получить имя файла.

Пример:

from pyspark.sql.functions import *

spark.read.option("header",true).\
option("delimiter"," ").\
csv("<path>").\
withColumn("file_name",input_file_name).\
withColumn("duration",col("end_time") - col("start_time")).show()

В случае, если строки разделены space, прочитайте данные с некоторыми разделитель, которого нет в файле.

  • Затем разделите данные с помощью \\s+ и разбейте их, теперь мы получим данные в строки данных.

  • Использование подстроки Извлечение функции start_time,end_time и вычитание их для получения продолжительности.


spark.read.csv("<file_path>").\
withColumn("input",explode(split(col("_c0"),"\\s+"))).\
withColumn("filename",input_file_name()).\
drop("_c0").\
show()

UPDATE

Using array index:

spark.read.csv("<file_path>").\
withColumn("input",explode(split(col("_c0"),"\\s+"))).\
withColumn("filename",reverse(split(input_file_name(),'/'))[0]).\
drop("_c0").\
show()
#or
spark.read.csv("<file_path>").\
withColumn("input",explode(split(col("_c0"),"\\s+"))).\
withColumn("filename",reverse(split(input_file_name(),'/')).getItem(0)).\
drop("_c0").\
show()

From Spark-2.4+ Using element_at:

spark.read.csv("<file_path>").\
withColumn("input",explode(split(col("_c0"),"\\s+"))).\
withColumn("filename",element_at(split(input_file_name(),'/'),-1)).\
drop("_c0").\
show()
0 голосов
/ 19 апреля 2020

Просто еще один аналогичный подход в Scala - чтение файла с использованием spark.read.csv () с разделителем в качестве пробела и именования имен файлов как (при условии, что spark -> spark сессия уже присутствует)

val inputDF = spark.read
      .option("inferSchema", "true")
      .option("delimiter", " ")
      .csv("<path>")
    .toDF("start_time","end_time")

 val output = inputDF
     .withColumn("duration", col("end_time") - col("start_time"))
     .withColumn("input_file_name", input_file_name())
     .withColumn("file_name_splits", split(col("input_file_name"), "/"))
     // Getting the last element from the splits using size function
     .withColumn("file_name", col("file_name_splits").apply(size(col("file_name_splits")).minus(1)))
     .select("file_name", "start_time", "end_time", "duration")

// To show the sample data
output.show(false)
...