Pyspark - разбить большой текстовый файл на несколько файлов - PullRequest
0 голосов
/ 29 октября 2018

Мне нужно разбить большой текстовый файл в S3, который может содержать ~100 million records, на несколько файлов и сохранить отдельные файлы обратно в S3 как .txt файлы.Эти записи не разделены, и каждый столбец может быть идентифицирован на основе начальной и конечной позиций.Длина каждой записи варьируется в зависимости от «типа», то есть строки с фиксированной начальной / конечной позицией, и мне нужно разбить этот файл на несколько файлов в зависимости от значения «типа».

Например:

My name is Chris  age 45  
My name is Denni  age 46  
My name is Vicki  age 47  
My name is Denni  age 51  
My name is Chris  age 52

В приведенном выше примере предположим, что мой «тип записи» начинается с 12-й позиции и заканчивается на 17-й позиции.Из последовательности шагов:

1. I need to get a distinct list of record types, which in this case are "Chris", "Denni" and "Vicki"

2. I need to split this file into 3 files, one for each record type and save them with same name as record types. Chris.txt, Denni.txt and Vicki.txt

Желаемый результат:

Крис.txt:

My name is Chris  age 45  
My name is Chris  age 52 

Denni.txt:

My name is Denni  age 46  
My name is Denni  age 51

Vicki.txt:

My name is Vicki  age 47 

Я использую фреймы данных pyspark для достижения этой цели, и теперь у меня есть что-то вроде этого,

df_inter =df.select(df.value.substr(start,end).alias("Type"),df.value.alias("value"))

    df_types = df_inter.select("Type").distinct()
    type_count = df_types.count()

    while(i<type_count):
      type = df_types.select(df_types.Type).collect()[i][0]
      df_filtered = df_inter.filter(df_inter["Type"] == type)
      df_filtered.saveAsTextFile("path")
      i += 1

Текущий код работает, но требуется ~25 mins для обработки 2.5 gb file с 5 узлами r5.xlargeEMR кластер, и занимает гораздо больше времени, скажем, файл 25 GB.Я хотел бы понять, есть ли более эффективный способ сделать это и сократить время обработки.Ценю ваш вклад.

Ответы [ 2 ]

0 голосов
/ 14 ноября 2018

Вот, пожалуйста! Вы можете проанализировать ваш простой файл Mainframe в определенной позиции и сделать его разделителем как csv.

import csv

PlainTextfile  = 'InputFilePathLocation\Input_File.txt'
CSV_OutputFile = 'OutputFilePathLocation\Output_File.txt'
cols = [(0,2),(3,8),(8,10),(11,17),(17,22),(22,24)]

with open(PlainTextfile,'r') as fin, open(CSV_OutputFile, 'wt') as fout:
    writer = csv.writer(fout, delimiter=",", lineterminator="\n")
    for line in fin:
            line = line.rstrip()  # removing the '\n' and other trailing whitespaces
            data = [line[c[0]:c[1]] for c in cols]
            print("data:",data)
            writer.writerow(data)

Ваш выходной файл становится теперь:

My,name ,is,Chris , age ,45
My,name ,is,Denni , age ,46
My,name ,is,Vicki , age ,47
My,name ,is,Denni , age ,51
My,name ,is,Chris , age ,52

и затем вы можете загрузить этот csv-файл с разделителями в dataframe или RDD и использовать операцию фильтрации, чтобы разделить его на разные фреймы данных или записать в разные csv-файлы с помощью метода класса Writer. Дайте мне знать, если вам понадобится дополнительная информация по этому вопросу.

0 голосов
/ 30 октября 2018

Я предполагаю, что ваши данные разделены табуляцией. Вы можете загрузить целые данные в информационный кадр, как показано ниже:

df = spark.read.format("com.databricks.spark.csv") \
  .option("mode", "DROPMALFORMED") \
  .option("header", "false") \
  .option("inferschema", "true") \
  .option("delimiter", '\t').load(PATH_TO_FILE)

+---+----+---+-----+---+---+
|_c0| _c1|_c2|  _c3|_c4|_c5|
+---+----+---+-----+---+---+
| My|name| is|Chris|age| 45|
| My|name| is|Denni|age| 46|
| My|name| is|Vicki|age| 47|
| My|name| is|Denni|age| 51|
| My|name| is|Chris|age| 52|
+---+----+---+-----+---+---+

from pyspark.sql.functions import col

Then you can filter the dataframe data and split into multiple dataframe depending on your column value.

Chris_df=df.filter(col('_c3')=='Chris')
+---+----+---+-----+---+---+
|_c0| _c1|_c2|  _c3|_c4|_c5|
+---+----+---+-----+---+---+
| My|name| is|Chris|age| 45|
| My|name| is|Chris|age| 52|
+---+----+---+-----+---+---+
Denni_df=df.filter(col('_c3')=='Denni')
+---+----+---+-----+---+---+
|_c0| _c1|_c2|  _c3|_c4|_c5|
+---+----+---+-----+---+---+
| My|name| is|Denni|age| 46|
| My|name| is|Denni|age| 51|
+---+----+---+-----+---+---+
Vicki_df=df.filter(col('_c3')=='Vicki')

+---+----+---+-----+---+---+
|_c0| _c1|_c2|  _c3|_c4|_c5|
+---+----+---+-----+---+---+
| My|name| is|Vicki|age| 47|
+---+----+---+-----+---+---+

Надеюсь, это сработает быстрее!

...