pyspark условно разобрать текстовый файл фиксированной ширины - PullRequest
0 голосов
/ 18 декабря 2018

Таким образом, у меня есть файл фиксированной ширины, и я не буду знать его формат, пока определенная переменная в нем не проверит, является ли определенная переменная '01' или '02'.Поэтому я пытаюсь создать что-то вроде этого:

myreport= spark.read.text("/mnt/path/mydata")
myreport= myreport.select(myreport.value.substr(1,3).alias('client'),
myreport.value.substr(4,2).alias('rptnum'),
if rptnum = '01', then
myreport.value.substr(6,2).cast('integer').alias('mo1'),
myreport.value.substr(8,2).cast('integer').alias('mo2'),
myreport.value.substr(12,2).cast('integer').alias('mo3'),
Else
myreport.value.substr(6,2).cast('integer').alias('mo1'),
myreport.value.substr(8,2).cast('integer').alias('mo2'),
myreport.value.substr(12,2).cast('integer').alias('mo3'),
myreport.value.substr(14,2).cast('integer').alias('mo4'),
myreport.value.substr(16,2).cast('integer').alias('mo5'),
myreport.value.substr(18,2).cast('integer').alias('mo6'),

В основном количество столбцов удваивается, если число rpt не 01. Довольно неуверенно, как это сделать в pyspark

1 Ответ

0 голосов
/ 19 декабря 2018

Вы должны написать функцию, которая будет вызываться из df.rdd.map(), и преобразовывать / анализировать каждую строку.Вы можете создать такое же количество столбцов, но в одном случае некоторые из столбцов будут нулевыми.Используя filter() в rptnum, вы можете выделить строки и выбрать соответствующие столбцы.

from pyspark.sql.functions import *
from pyspark.sql import *

def transformRow(row):
    value = row['value']
    client = value[1:4]
    rptnum = value[4:6]
    rowDict = {'client': client, 'rptnum': rptnum,'mo1': None,'mo2': None,'mo3': None,'mo4': None,'mo5': None,'mo6': None}
    rowDict['mo1'] = value[6:8]
    rowDict['mo2'] = value[8:10]
    rowDict['mo3'] = value[10:12]

    if rptnum != '01' :
        rowDict['mo4'] = value[12:14]
        rowDict['mo5'] = value[14:16]
        rowDict['mo6'] = value[16:18]
    return Row(**rowDict)

myreport= spark.read.text("/mnt/path/mydata")
myreport = myreport.rdd.map(transformRow).toDF()

rpt1 = myreport.filter(col("rptnum") == '01').select("mo1","mo2","mo3")
rpt2 = myreport.filter(col("rptnum") != '01')
...