Чтение файла фиксированной ширины с использованием схемы из файла json в pyspark - PullRequest
0 голосов
/ 17 декабря 2018

У меня есть файл фиксированной ширины, как показано ниже

00120181120xyz12341
00220180203abc56792
00320181203pqr25483 

И соответствующий файл JSON, который задает схему:

{"Column":"id","From":"1","To":"3"}
{"Column":"date","From":"4","To":"8"}
{"Column":"name","From":"12","To":"3"}
{"Column":"salary","From":"15","To":"5"}

Я читаю файл схемы в DataFrame, используя:

SchemaFile = spark.read\
    .format("json")\
    .option("header","true")\
    .json('C:\Temp\schemaFile\schema.json')

SchemaFile.show()
#+------+----+---+
#|Column|From| To|
#+------+----+---+
#|    id|   1|  3|
#|  date|   4|  8|
#|  name|  12|  3|
#|salary|  15|  5|
#+------+----+---+

Аналогично, я анализирую файл фиксированной ширины в фрейме данных pyspark, как показано ниже:

File = spark.read\
    .format("csv")\
    .option("header","false")\
    .load("C:\Temp\samplefile.txt")

File.show()
#+-------------------+
#|                _c0|
#+-------------------+
#|00120181120xyz12341|
#|00220180203abc56792|
#|00320181203pqr25483|
#+-------------------+

Очевидно, что я могу жестко запрограммировать значения позиций и длин каждого столбца, чтобы получитьжелаемый результат:

from pyspark.sql.functions import substring
data = File.select(
    substring(File._c0,1,3).alias('id'),
    substring(File._c0,4,8).alias('date'),
    substring(File._c0,12,3).alias('name'),
    substring(File._c0,15,5).alias('salary')
)

data.show()
#+---+--------+----+------+
#| id|    date|name|salary|
#+---+--------+----+------+
#|001|20181120| xyz| 12341|
#|002|20180203| abc| 56792|
#|003|20181203| pqr| 25483|
#+---+--------+----+------+

Но как я могу использовать SchemaFile DataFrame, чтобы указать ширину и имена столбцов для строк, чтобы схема могла применяться динамически (без жесткого кодирования) во время выполнения?

1 Ответ

0 голосов
/ 18 декабря 2018

Здесь проще всего сделать collect содержимое SchemaFile и выполнить цикл по его строкам для извлечения нужных данных.

Сначала прочитайте файл схемы как JSON в DataFrame.Затем вызовите метод collect и сопоставьте каждую строку со словарем:

sfDict = map(lambda x: x.asDict(), SchemaFile.collect())
print(sfDict)
#[{'Column': u'id', 'From': u'1', 'To': u'3'},
# {'Column': u'date', 'From': u'4', 'To': u'8'},
# {'Column': u'name', 'From': u'12', 'To': u'3'},
# {'Column': u'salary', 'From': u'15', 'To': u'5'}]

Теперь вы можете перебирать строки в sfDict и использовать значения для подстроки столбца:

from pyspark.sql.functions import substring
File.select(
    *[
        substring(
            str='_c0',
            pos=int(row['From']),
            len=int(row['To'])
        ).alias(row['Column']) 
        for row in sfDict
    ]
).show()
#+---+--------+----+------+
#| id|    date|name|salary|
#+---+--------+----+------+
#|001|20181120| xyz| 12341|
#|002|20180203| abc| 56792|
#|003|20181203| pqr| 25483|
#+---+--------+----+------+

Примечаниечто мы должны привести To и From к целым числам, поскольку они указаны в виде строк в вашем файле json.

...