Я бы прочитал файл как rdd
, чтобы воспользоваться zipWithIndex
для добавления индекса к вашим данным.
rdd = sc.textFile("temp.txt")
Теперь мы можем использовать усечающее деление для создания индекса, с помощью которогогруппировать записи вместе. Используйте этот новый индекс как key
для rdd
. Соответствующий values
будет кортежем заголовка, который может быть вычислен с использованием модуля и фактического значения. (Обратите внимание, что индекс, возвращаемый zipWithIndex
, будет в конце записи, поэтому мы используем row[1]
для деления / мода.)
Далее используем значение reduceByKey
до add
tuple
с вместе. Это даст вам набор ключей и значений (по порядку). Используйте map
, чтобы превратить это в Row
(чтобы сохранить заголовки столбцов и т. Д.).
Наконец, используйте toDF()
для преобразования в DataFrame. Вы можете использовать select(header)
, чтобы получить столбцы в нужном порядке.
from operator import add
from pyspark.sql import Row
header = ["name", "age", "city"]
df = rdd.zipWithIndex()\
.map(lambda row: (row[1]//3, (header[row[1]%3], row[0])))\
.reduceByKey(add)\
.map(lambda row: Row(**dict(zip(row[1][::2], row[1][1::2]))))\
.toDF()\
.select(header)
df.show()
#+----+---+-------+
#|name|age| city|
#+----+---+-------+
#| sam| 11|newyork|
#|eric| 22| texas|
#|john| 13| boston|
#+----+---+-------+