Pyspark - Как добиться параллелизма для данного алгоритма, который в противном случае менее производительный? - PullRequest
1 голос
/ 30 апреля 2019

У меня есть два кадра данных

df_1 = spark.createDataFrame(
[(1, [(1), (4), (2) ,(3)])],
("id", "transactions")
)

df_2 = spark.createDataFrame(
[([ (1),(2),(3)], 2.0) ],
("items", "cost")
)

У меня есть UDF, чтобы проверить, все ли элементы массива присутствуют в другом массиве. Я использую искру 2.2

 @udf("boolean")
 def contains_all(x, y):
    if x is not None and y is not None:
       return set(y).issubset(set(x))


costs=(df_1
.crossJoin(df_2.groupBy("id", "transactions")
.agg(sum_(when(
 contains_all("transactions", "items"), col("cost")
 )).alias("score")))

Поскольку один из фреймов данных (df_2) очень большой, приведенный выше код очень медленный (уже день и работает).

Как я могу выполнить вышеуказанные операции для параллельной работы? Должен ли я преобразовать кадры данных в RDD? Есть ли способ ускорить? Любой пример такого типа был бы великолепен.

...