Как найти среднее значение столбца массива на основе индекса в pyspark - PullRequest
0 голосов
/ 21 июня 2019

У меня есть данные, как показано ниже

-----------------------------
place  | key        | weights
----------------------------
amazon | lion       | [ 34, 23, 56 ]
north  | bear       | [ 90, 45]
amazon | lion       | [ 38, 30, 50 ]
amazon | bear       | [ 45 ]
amazon | bear       | [ 40 ]

Я пытаюсь получить результат, как показано ниже

-----------------------------
place  | key        | average
----------------------------
amazon | lion1      | 36.0      #(34 + 38)/2
amazon | lion2      | 26.5      #(23 + 30)/2
amazon | lion3      | 53.0      #(50 + 56)/2
north  | bear1      | 90        #(90)/1
north  | bear2      | 45        #(45)/1
amazon | bear1      | 42.5      #(45 + 40)/2

Я понимаю, что сначала я должен выполнить групповую операцию для столбцов place и key, а затем я должен взять среднее значение для элементов массива на основе индексов. Например, lion1 является первым индексным элементом в массивах [ 34, 23, 56 ] и [ 38, 30, 50 ].

У меня уже есть решение, использующее posexplode, но проблема в реальных данных weights Размер столбца массива очень велик, так как posexplode добавляет больше строк, размер данных значительно увеличился с 10 миллионов строк до 1,2 миллиарда и не в состоянии вычислить в надежное время на текущем кластере.

Я думаю, что лучше добавить больше столбцов, чем строк, а затем отключить их, но я понятия не имею, как этого добиться с помощью pyspark или spark SQL 2.2.1.

Ответы [ 2 ]

0 голосов
/ 21 июня 2019

Максимальное количество элементов в столбце массива можно найти с помощью functions.size () , а затем разверните этот столбец:

  1. настроить данные

    from pyspark.sql import functions as F
    
    df = spark.createDataFrame([    
          ('amazon', 'lion', [ 34, 23, 56 ])
        , ('north',  'bear', [ 90, 45])
        , ('amazon', 'lion', [ 38, 30, 50 ])
        , ('amazon', 'bear', [ 45 ])    
        , ('amazon', 'bear', [ 40 ])
    ], ['place', 'key', 'average'])
    
  2. Найти максимальное количество элементов в поле массива 'Среднее'

    n = df.select(F.max(F.size('average')).alias('n')).first().n
    
    >>> n
    3
    
  3. Преобразовать столбец массива в n-столбцы

    df1 = df.select('place', 'key', *[F.col('average')[i].alias('val_{}'.format(i+1)) for i in range(n)])
    
    >>> df1.show()
    +------+----+-----+-----+-----+
    | place| key|val_1|val_2|val_3|
    +------+----+-----+-----+-----+
    |amazon|lion|   34|   23|   56|
    | north|bear|   90|   45| null|
    |amazon|lion|   38|   30|   50|
    |amazon|bear|   45| null| null|
    |amazon|bear|   40| null| null|
    +------+----+-----+-----+-----+
    
  4. Рассчитать среднее агрегирование по новым столбцам

    df2 = df1.groupby('place', 'key').agg(*[ F.mean('val_{}'.format(i+1)).alias('average_{}'.format(i+1)) for i in range(n)])
    
    >>> df2.show()
    +------+----+---------+---------+---------+
    | place| key|average_1|average_2|average_3|
    +------+----+---------+---------+---------+
    |amazon|bear|     42.5|     null|     null|
    | north|bear|     90.0|     45.0|     null|
    |amazon|lion|     36.0|     26.5|     53.0|
    +------+----+---------+---------+---------+
    
  5. Отключить столбцы, используя select + union + lower

    from functools import reduce
    
    df_new = reduce(lambda x,y: x.union(y), [
        df2.select('place', F.concat('key', F.lit(i+1)).alias('key'), F.col('average_{}'.format(i+1)).alias('average')) \
           .dropna(subset=['average']) for i in range(n)
    ])
    
    >>> df_new.show()
    +------+-----+-------+
    | place|  key|average|
    +------+-----+-------+
    |amazon|bear1|   42.5|
    | north|bear1|   90.0|
    |amazon|lion1|   36.0|
    | north|bear2|   45.0|
    |amazon|lion2|   26.5|
    |amazon|lion3|   53.0|
    +------+-----+-------+
    
0 голосов
/ 21 июня 2019

Один из вариантов - объединить все array s для заданного места, комбинацию клавиш в массив. В этом массиве массивов вы можете использовать udf, который вычисляет желаемое среднее значение и, наконец, posexplode, чтобы получить желаемый результат.

from pyspark.sql.functions import collect_list,udf,posexplode,concat
from pyspark.sql.types import ArrayType,DoubleType

#Grouping by place,key to get an array of arrays
grouped_df = df.groupBy(df.place,df.key).agg(collect_list(df.weights).alias('all_weights'))

#Define UDF
zip_mean = udf(lambda args: [sum(i)/len(i) for i in zip(*args)],ArrayType(DoubleType()))

#Apply UDF on the array of array column
res = grouped_df.select('*',zip_mean(grouped_df.all_weights).alias('average'))

#POS explode to explode the average values and get the position for key concatenation
res = res.select('*',posexplode(res.average))

#Final result
res.select(res.place,concat(res.key,res.pos+1).alias('key'),res.col).show()
...