Итерация по dict RDD и присвоение значения столбцу данных - PullRequest
0 голосов
/ 14 ноября 2018

Итак, у меня есть датафрейм df, вот так,

+---+-----+
| ID|COL_A|
+---+-----+
|  1|  123|
+---+-----+

У меня тоже есть что-то вроде:

{"COL_B":"abc","COL_C":""}

Теперь мне нужно обновить df ключами в dict, являющимися новым именем столбца, и значением ключа, являющимся стоимостным значением столбца.

Ожидаемый df должен быть таким:

+---+-----+-----+-----+
| ID|COL_A|COL_B|COL_C|
+---+-----+-----+-----+
|  1|  123|  abc|     |
+---+-----+-----+-----+

Теперь вот мой Python-код, чтобы сделать это, который работает нормально ...

input_data = pd.read_csv(inputFilePath,dtype=str)
for key, value in mapRow.iteritems():   #mapRow is the dict
        if value is None:
                input_data[key] = ""
        else:
                input_data[key] = value

Теперь я переношу этот код в pyspark и хотел бы узнать, как это сделать в pyspark ?

Спасибо за помощь.

1 Ответ

0 голосов
/ 14 ноября 2018

Для объединения RDD мы используем zip или join.Ниже приведено объяснение с использованием zip.zip - для их объединения и map для выравнивания.

from pyspark.sql import Row

rdd_1 = sc.parallelize([Row(ID=1,COL_A=2)])
rdd_2 = sc.parallelize([Row(COL_B="abc",COL_C=" ")])

result_rdd = rdd_1.zip(rdd_2).map(lamda x: [j for i in x for j in i])

ПРИМЕЧАНИЕ В настоящее время у меня не было payspark, поэтому это не проверено.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...