Как обновить значение искрового датафрейма в python? - PullRequest
0 голосов
/ 19 ноября 2018

У меня есть искровой фрейм данных

        df = spark.createDataFrame([('Andy', 'NY'), ('Bob', 'PA'), ('Cindy', 'DC')], ("FName","City "))

Здесь я пытаюсь создать новый фрейм данных с зашифрованным столбцом Fname. Функция шифрования PGP, приведенная ниже, принимает строку в качестве входных данных и дает зашифрованную строку в качестве выходных.

df.createOrReplaceTempView("Customer")

for line in spark.table("Customer").collect():
    gpg = gnupg.GPG()
    gpg.import_keys('/home/keys/key.txt')
    encry_str=gpg.encrypt(line.FName, 'recipientid', passphrase='passphrase', always_trust=True)
    print(encry_str)

Зашифрованная строка выглядит следующим образом:

-----BEGIN PGP MESSAGE-----
Version: GnuPG v2

hQEMAyBWi2alDMW+AQf5AQn4VdbVNEHMWKzXUFRUyW+m1vepxbN//ENHw0F3dzvd
NAldsgZzpCv5pPq4QLYFw4Sq0eWqOK6Ezg4VxvBIB1l5J4cGsx7kMX9sfCU8T7Be
xqF1ZtWoTOqHp/cwt5NQFi+D302kRfUjUooszOl1zHOp9uOP12WEa/eInoCGRza1
z+73TQ1/0lxieuVVsJu4CsQhEDG9atk+rD21sRsfsOEIAzgIaXyBekZJ0zaiLJCe
LSqum0HebBrl5VJ5yozoAlDAIt0+oXsG2JwqsWpoQFKkuQFsqYGJ61k1+nX/st7i
WWKUvWtjb1ABp3XhC+nT8LpZYCNGIkx0wxQCqcsjjNI/AVjRHvbZsrCfZpua+vdJ
Vv/i1ZKfq0r/FPKgspHdCtMx2/ZAEmVZ3paHM/RGuFm82ihQhXkT78Ik//EiZD5D
=mRs6
-----END PGP MESSAGE-----

Ожидаемый вывод

+-----+-----+--------------------+
|FName|City |           Encrypted|
+-----+-----+--------------------+
| Andy|   NY|-----BEGIN PGP ME...|
|  Bob|   PA|-----BEGIN PGP ME...|
|Cindy|   DC|-----BEGIN PGP ME...| 
+-----+-----+--------------------+

Я пытаюсь обновить столбец FName, но получаю исключение

line.FName=gpg.encrypt(line.FName, 'recipientid', passphrase='passphrase', always_trust=True)

Исключение: строка доступна только для чтения.

Как в приведенном выше кадре я могу добавлять / обновлять значения зашифрованных строк в соответствующие столбцы Dataframe?

1 Ответ

0 голосов
/ 19 ноября 2018

Вы должны думать о кадрах данных Spark и RDD как о ссылках / рецептах на базовые данные.Поэтому, если вы действительно хотите изменить данных, вам необходимо сначала преобразовать, а затем обновить / перезаписать существующие данные.

Для преобразования:

from pyspark.sql import Row

def mapper(row):
    # if row doesn't need updating, return original
    if row['my_test_column'] != 'some_test_value':
        return row

    row = row.asDict()
    row['updated_column'] = some_function(row['some_column'], ...)

    return Row(**row)

Всохранить:

df_updated.write.saveAsTable('my_schema.my_new_table')

Для обновления / перезаписи:

df_updated.write.mode('overwrite').saveAsTable('my_schema.my_table')
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...