отображать значения в кадре данных из словаря с помощью pyspark - PullRequest
0 голосов
/ 14 мая 2018

Я хочу знать, как отобразить значения в определенном столбце в кадре данных.

У меня есть датафрейм, который выглядит так:

df = sc.parallelize([('india','japan'),('usa','uruguay')]).toDF(['col1','col2'])

+-----+-------+
| col1|   col2|
+-----+-------+
|india|  japan|
|  usa|uruguay|
+-----+-------+

У меня есть словарь, из которого я хочу отобразить значения.

dicts = sc.parallelize([('india','ind'), ('usa','us'),('japan','jpn'),('uruguay','urg')])

Вывод, который я хочу получить:

+-----+-------+--------+--------+
| col1|   col2|col1_map|col2_map|
+-----+-------+--------+--------+
|india|  japan|     ind|     jpn|
|  usa|uruguay|      us|     urg|
+-----+-------+--------+--------+

Я пытался использовать lookup function, но он не работает. Выдает ошибку SPARK-5063. Вот мой подход, который потерпел неудачу:

def map_val(x):
    return dicts.lookup(x)[0]

myfun = udf(lambda x: map_val(x), StringType())

df = df.withColumn('col1_map', myfun('col1')) # doesn't work
df = df.withColumn('col2_map', myfun('col2')) # doesn't work

Ответы [ 2 ]

0 голосов
/ 25 августа 2018

Я думаю, что проще всего использовать простые dictionary и df.withColumn.

from itertools import chain
from pyspark.sql.functions import create_map, lit

simple_dict = {'india':'ind', 'usa':'us', 'japan':'jpn', 'uruguay':'urg'}

mapping_expr = create_map([lit(x) for x in chain(*simple_dict.items())])

df = df.withColumn('col1_map', mapping_expr[df['col1']])\
       .withColumn('col2_map', mapping_expr[df['col2']])

df.show(truncate=False)
0 голосов
/ 14 мая 2018

udf way

Я бы посоветовал вам изменить список кортежей на dicts и широковещательно , чтобы он использовался в udf

dicts = sc.broadcast(dict([('india','ind'), ('usa','us'),('japan','jpn'),('uruguay','urg')]))

from pyspark.sql import functions as f
from pyspark.sql import types as t
def newCols(x):
    return dicts.value[x]

callnewColsUdf = f.udf(newCols, t.StringType())

df.withColumn('col1_map', callnewColsUdf(f.col('col1')))\
    .withColumn('col2_map', callnewColsUdf(f.col('col2')))\
    .show(truncate=False)

что должно дать вам

+-----+-------+--------+--------+
|col1 |col2   |col1_map|col2_map|
+-----+-------+--------+--------+
|india|japan  |ind     |jpn     |
|usa  |uruguay|us      |urg     |
+-----+-------+--------+--------+

способ соединения (медленнее, чем путь udf)

Все, что вам нужно сделать, это изменить значения rdd на dataframe и использовать два объединения с псевдонимами следующим образом

df = sc.parallelize([('india','japan'),('usa','uruguay')]).toDF(['col1','col2'])

dicts = sc.parallelize([('india','ind'), ('usa','us'),('japan','jpn'),('uruguay','urg')]).toDF(['key', 'value'])

from pyspark.sql import functions as f
df.join(dicts, df['col1'] == dicts['key'], 'inner')\
    .select(f.col('col1'), f.col('col2'), f.col('value').alias('col1_map'))\
    .join(dicts, df['col2'] == dicts['key'], 'inner') \
    .select(f.col('col1'), f.col('col2'), f.col('col1_map'), f.col('value').alias('col2_map'))\
    .show(truncate=False)

, который должен дать вам тот же результат

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...