Pyspark - подсчитать длину новинки - PullRequest
0 голосов
/ 27 февраля 2020

В моем фрейме данных PySpark (<2.4) у меня есть два списка. Я хочу посчитать новые элементы в List1, которых нет в List2 </p>

data = [(("ID1", ['A', 'B'], ['A', 'C'])), (("ID2", ['A', 'B'], ['A', 'B'])), (("ID2", ['A', 'B'], None))]
df = spark.createDataFrame(data, ["ID", "List1", "List2"])
df.show(truncate=False)

+---+------+------+
|ID |List1 |List2 |
+---+------+------+
|ID1|[A, B]|[A, C]|
|ID2|[A, B]|[A, B]|
|ID2|[A, B]|null  |
+---+------+------+

. В настоящее время я написал UDF, который может дать мне ответ. Я проверяю, могу ли я сделать это без UDF.

Текущее решение

def sum_list(x, y):
    total = 0
    if y is None:
      total = 0

    elif x is None and y is not None:
      total = len(y)

    else:
      lst = [1 for item in y if item not in x]
      total = len(lst)

    return total

new_udf = udf(sum_list , IntegerType())
df = df.withColumn('new_count', new_udf('List2', 'List1'))
df.show()

+---+------+------+---------+
| ID| List1| List2|new_count|
+---+------+------+---------+
|ID1|[A, B]|[A, C]|        1|
|ID2|[A, B]|[A, B]|        0|
|ID2|[A, B]|  null|        2|
+---+------+------+---------+

Ответы [ 2 ]

1 голос
/ 27 февраля 2020

Используя pyspark <2.4, вы можете комбинировать <code>explode, groupby и array_contain:

df = df.select('ID', 'List1', 'List2', F.explode('List1').alias('list1_explode'))
df = df.groupby('ID', 'List1', 'List2').agg((F.sum(F.when(F.expr("array_contains(List2, list1_explode)"),0).otherwise(1))).alias('new_count'))
df.show()

+---+------+------+---------+
| ID| List1| List2|new_count|
+---+------+------+---------+
|ID2|[A, B]|[A, B]|        0|
|ID2|[A, B]|  null|        2|
|ID1|[A, B]|[A, C]|        1|
+---+------+------+---------+
0 голосов
/ 27 февраля 2020

Вы можете использовать array_except. Но Spark> = 2.4.0.

from pyspark.sql import SparkSession
from pyspark.sql.functions import array_except

spark = SparkSession.builder.appName("test").getOrCreate()
data = [(("ID1", ['A', 'B'], ['A', 'C'])), (("ID2", ['A', 'B'], ['A', 'B'])), (("ID2", ['A', 'B'], None))]
df = spark.createDataFrame(data, ["ID", "List1", "List2"])
df.show()

df.withColumn('new_count', when(df.List2.isNull(), size(df.List1)).otherwise(size(array_except('List1', 'List2')))).show()
...