Перекрестное соединение Pyspark между 2 фреймами данных с миллионами записей - PullRequest
0 голосов
/ 29 мая 2020

У меня есть 2 фрейма данных A (35 миллионов записей) и B (30000 записей)

A

|Text |
-------
| pqr  |
-------
| xyz  |
------- 

B

|Title |
-------
| a  |
-------
| b  |
-------
| c  |
------- 

Нижний фрейм данных C получается после перекрестного соединения между A и B.

c = A.crossJoin(B, on = [A.text == B.Title)

C

|text | Title |
---------------
| pqr  | a    |
---------------
| pqr  | b    |
---------------
| pqr  | c    |
---------------
| xyz  | a    |
---------------
| xyz  | b    |
---------------
| xyz  | c    |
---------------

Оба столбцы выше относятся к типу String.

Я выполняю указанную ниже операцию, и это приводит к ошибке Spark (задание прервано из-за сбоя этапа)

display(c.withColumn("Contains", when(col('text').contains(col('Title')), 1).otherwise(0)).filter(col('Contains') == 0).distinct())

Любые предложения о том, как это соединение должно быть выполнено, чтобы избежать Spark error () для результирующих операций?

Сообщение об ошибке Spark

1 Ответ

0 голосов
/ 29 мая 2020

попробуйте использовать broadcast объединений

from pyspark.sql.functions import broadcast
c = functions.broadcast(A).crossJoin(B)

Если вам не нужен дополнительный столбец «Содержит», вы можете просто отфильтровать его как

display(c.filter(col("text").contains(col("Title"))).distinct())
...