Счетчик в pyspark для проверки массива внутри массива с дубликатами - PullRequest
0 голосов
/ 28 марта 2019

у меня есть датафрейм df1

id     transactions
1      [1, 3,3,3,2,5]
2      [1,2]
root
 |-- id: int (nullable = true)
 |-- transactions: array (nullable = false)
      |-- element: string(containsNull = true)
None

у меня есть датафрейм df2

items         cost
[1, 3,3, 5]    2
[1, 5]      1

root
|-- items: array (nullable = false)
  |-- element: string (containsNull = true)
 |-- cost: int (nullable = true)
None

Я должен проверить, находятся ли позиции в транзакциях, если это так, то суммировать затраты. [1,3,3,5] в [1,3,3,3,5] истинно, а [1,3,3,5] в [1,2] ложно и т. Д.

результат должен быть

id     transactions   score
1      [1,3,3,3,5]    3
2      [1,2]          null

Я пытался взорвать и объединить (inner, left_semi), методы, но все это не удалось из-за дубликатов. Проверить все элементы массива, присутствующие в другом массиве pyspark issubset (), array_intersect () также не будет работать.

Я наткнулся на Python - проверка, является ли один список подмножеством другого . Я обнаружил, что следующее решает проблему и очень эффективно.

from collections import Counter
not Counter([1,3,3,3,5])-Counter([1,3,3,4,5])
False
>>> not Counter([1,3,3,3,5])-Counter([1,3,3,5])
False
>>> not Counter([1,3,3,5])-Counter([1,3,3,3,5])
True

я попробовал следующее

@udf("boolean")
def contains_all(x, y):
if x is not None and y is not None:
    return not (lambda y: dict(Counter(y)))-(lambda x: dict(Counter(x)))


(df1
.crossJoin(df2).groupBy("id", "transactions")
.agg(sum_(when(
    contains_all("transactions", "items"), col("cost")
)).alias("score"))
.show())

но выдает ошибку. Файл "", строка 39, в содержит_все Ошибка типа: неподдерживаемые типы операндов для -: 'function' и 'function'

Есть ли другой способ добиться этого?

1 Ответ

1 голос
/ 28 марта 2019

Только что обновил udf для хранения дубликатов и не уверен в производительности,

from pyspark.sql.functions import udf,array_sort,sum as sum_,when,col

dff = df1.crossjoin(df2)

dff = dff.withColumn('transaction',array_sort('transaction')).\
      withColumn('items',array_sort('items')) ## sorting here,it's needed in UDF

+---+---------------+------------+----+
| id|    transaction|       items|cost|
+---+---------------+------------+----+
|  1|[1, 2, 3, 3, 5]|[1, 3, 3, 5]|   2|
|  1|[1, 2, 3, 3, 5]|      [1, 5]|   1|
|  2|         [1, 2]|[1, 3, 3, 5]|   2|
|  2|         [1, 2]|      [1, 5]|   1|
+---+---------------+------------+----+

@udf('boolean')
def is_subset_w_dup(trans,itm):
    itertrans = iter(trans)
    return all(i in itertrans for i in itm)


dff.groupby('id','transaction').agg(sum_(when(is_subset_w_dup('transaction','items'),col('cost'))).alias('score')).show()

+---+---------------+-----+
| id|    transaction|score|
+---+---------------+-----+
|  2|         [1, 2]| null|
|  1|[1, 2, 3, 3, 5]|    3|
+---+---------------+-----+
...