Как преобразовать линейный список PySpark RDD в DataFrame? - PullRequest
0 голосов
/ 07 апреля 2019

Я хотел бы преобразовать линейный список в фрейм данных. то есть, учитывая следующий список,

a = ["a1", "a2", "a3", b1", "b2", "b3", "c1", "c2", "c3"]

Ожидаемый результат:

+--------------------+
| col1 | col2 | col3 |
+--------------------+
|  a1  |  a2  |  a3  |
|  b1  |  b2  |  b3  |
|  c1  |  c2  |  c3  |
+--------------------+

Я попробовал следующее, но получил ошибку.

from pyspark.sql.types import *

a = ["a1", "a2", "a3", "b1", "b2", "b3", "c1", "c2", "c3"]

rdd = sc.parallelize(a)

schema = StructType([
     StructField("a", StringType(), True),
     StructField("b", StringType(), True),
     StructField("c", StringType(), True)
     ])

df = sqlContext.createDataFrame(rdd, schema)

df.show()

Последний оператор show () получает ошибку "Задание прервано из-за сбоя этапа". Пожалуйста, кто-нибудь подскажет решение? Благодаря.

Ответы [ 2 ]

1 голос
/ 09 апреля 2019

Исходя из вашего комментария , я предполагаю, что вы начинаете с rdd, а не со списка.

Я также предполагаю, что вы определяете порядок на основе индекса rdd. Если эти предположения верны, вы можете использовать zipWithIndex() для добавления номера строки к каждой записи.

Затем разделите номер строки на 3 (используйте целочисленное деление), чтобы сгруппировать каждые 3 последовательных записи. Затем используйте groupByKey() для объединения записей с тем же key в кортеж.

Наконец, бросьте ключ и позвоните toDF()

rdd.zipWithIndex()\
    .map(lambda row: (row[1]//3, row[0]))\
    .groupByKey()\
    .map(lambda row: tuple(row[1]))\
    .toDF(["a", "b", "c"])\
    .show()
#+---+---+---+
#|  a|  b|  c|
#+---+---+---+
#| a1| a2| a3|
#| c1| c2| c3|
#| b1| b2| b3|
#+---+---+---+
0 голосов
/ 09 апреля 2019

Вот способ, который, как мы надеемся, должен соответствовать вашим критериям

# First get a 1 column DF
df = sql.createDataFrame(sc.parallelize(a).map(lambda x: [x]), schema=['col'])
# split each value into a number and letter e.g. 'a1' --> ['a','1']) 
df = df.withColumn('letter', f.split('col', '').getItem(0))
df = df.withColumn('number', f.split('col', '').getItem(1))

# Now pivot to get what you want (dropping extraneous columns and ordering 
# to get exact output

output = (df.groupBy('letter')
          .pivot('number')
          .agg(f.first('col'))
          .select([f.col(column).alias('col%s'%(column)) for column in ['1','2','3']])
          .orderBy('col1')
          .drop('letter'))
...