Строка репликации Pyspark на основе значения столбца - PullRequest
0 голосов
/ 29 июня 2018

Я хотел бы реплицировать все строки в моем DataFrame на основе значения данного столбца в каждой строке, а затем индексировать каждую новую строку. Предположим, у меня есть:

Column A Column B
T1       3
T2       2

Я хочу, чтобы результат был:

Column A Column B Index
T1       3        1
T1       3        2
T1       3        3
T2       2        1
T2       2        2

Мне удалось что-то подобное с фиксированными значениями, но не с помощью информации, найденной в столбце. Мой текущий рабочий код для фиксированных значений:

idx = [lit(i) for i in range(1, 10)]
df = df.withColumn('Index', explode(array( idx ) ))

Я пытался изменить:

lit(i) for i in range(1, 10) 

до

lit(i) for i in range(1, df['Column B'])

и добавьте его в мою функцию array ():

df = df.withColumn('Index', explode(array( lit(i) for i in range(1, df['Column B']) ) ))

но это не работает (TypeError: объект 'Column' не может быть интерпретирован как целое число).

Как мне это реализовать?

Ответы [ 2 ]

0 голосов
/ 13 марта 2019
You can try this:

    from pyspark.sql.window import Window
    from pyspark.sql.functions import *
    from pyspark.sql.types import ArrayType, IntegerType
    from pyspark.sql import functions as F
    df = spark.read.csv('/FileStore/tables/stack1.csv', header = 'True', inferSchema = 'True')

    w = Window.orderBy("Column A")
    df = df.select(row_number().over(w).alias("Index"), col("*"))

    n_to_array = udf(lambda n : [n] * n ,ArrayType(IntegerType()))
    df2 = df.withColumn('Column B', n_to_array('Column B'))
    df3= df2.withColumn('Column B', explode('Column B'))
    df3.show()
0 голосов
/ 30 июня 2018

К сожалению, вы не можете перебирать столбец , как это. Вы всегда можете использовать udf, но у меня есть решение не-udf hack , которое должно работать для вас, если вы используете Spark версии 2.1 или выше.

Хитрость заключается в том, чтобы воспользоваться pyspark.sql.functions.posexplode(), чтобы получить значение индекса. Мы делаем это, создавая строку, повторяя запятую Column B раз. Затем мы разделяем эту строку на запятую и используем posexplode для получения индекса.

df.createOrReplaceTempView("df")  # first register the DataFrame as a temp table

query = 'SELECT '\
    '`Column A`,'\
    '`Column B`,'\
    'pos AS Index '\
    'FROM ( '\
        'SELECT DISTINCT '\
        '`Column A`,'\
        '`Column B`,'\
        'posexplode(split(repeat(",", `Column B`), ",")) '\
        'FROM df) AS a '\
    'WHERE a.pos > 0'
newDF = sqlCtx.sql(query).sort("Column A", "Column B", "Index")
newDF.show()
#+--------+--------+-----+
#|Column A|Column B|Index|
#+--------+--------+-----+
#|      T1|       3|    1|
#|      T1|       3|    2|
#|      T1|       3|    3|
#|      T2|       2|    1|
#|      T2|       2|    2|
#+--------+--------+-----+

Примечание. Вам необходимо заключить имена столбцов в обратные кавычки, поскольку в них есть пробелы, как описано в этом посте: Как выразить столбец, имя которого содержит пробелы в Spark SQL

...