Как перебрать столбец массива в Pyspark при присоединении - PullRequest
1 голос
/ 09 января 2020

В pyspark у меня есть dataframe_a с:

+-----------+----------------------+
| str1      | array_of_str         |
+-----------+----------------------+
| John      | [mango, apple]       |
| Tom       | [mango, orange]      |
| Matteo    | [apple, banana]      | 

и dataframe_b с

+-----------+----------------------+
| key       | value                |
+-----------+----------------------+
| mango     | 1                    |
| apple     | 2                    |
| orange    | 3                    | 

, и я хочу создать новый столбец типа Array joined_result, который сопоставляет каждый элемент в array_of_str ( dataframe_a ) с его значением в dataframe_b , например:

+-----------+----------------------+----------------------------------+
| str1      | array_of_str         | joined_result                    |
+-----------+----------------------+----------------------------------+
| John      | [mango, apple]       | [1, 2]                           |
| Tom       | [mango, orange]      | [1, 3]                           |
| Matteo    | [apple, banana]      | [2]                              |

I я не уверен, как это сделать, я знаю, что могу использовать udf с лямбда-функцией, но мне не удается заставить его работать :( Помогите!

from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType

# START EXTRACT OF CODE
ret = (df
  .select(['str1', 'array_of_str'])
  .withColumn('joined_result', F.udf(
     map(lambda x: ??????, ArrayType(StringType))
  )
)

return ret
# END EXTRACT OF CODE

Заранее спасибо

1 Ответ

1 голос
/ 09 января 2020

Мой ответ на ваш вопрос:

lookup_list = map(lambda row: row.asDict(), dataframe_b.collect())
lookup_dict = {lookup['key']:lookup['value'] for lookup in lookup_list}

def mapper(keys):
  return [lookup_dict[key][0] for key in keys]

dataframe_a = dataframe_a.withColumn('joined_result', F.udf(mapper)("arr_of_str"))

Работает, как вы хотите: -)

...