Как передать каждое значение столбца Spark Dataframe в виде строки в UDF Python? - PullRequest
0 голосов
/ 18 ноября 2018

Я пытаюсь GPG зашифровать столбец данных в искровом фрейме FName

df = spark.createDataFrame([('Andy', 'NY'), ('Bob', 'PA'), ('Cindy', 'DC')], ("FName", "City"))

Я создал udf, который принимает строковое значение в качестве входных данных и дает зашифрованную строку в качестве выходных.

gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
encrypt_str = udf(lambda string_value: gpg.encrypt(string_value, 'myrecepeintemailid', passphrase='mypassphrase'))

Я применяю свой udf, как показано ниже:

df = df.withColumn('Encrypted_FName', encrypt_str(col('FName')))

Но, я предполагаю, что весь столбец проходит и не шифрует значение правильно.

Как перебрать каждое значение в dataframe ипередать его как string_value на udf?

Ответы [ 2 ]

0 голосов
/ 19 ноября 2018

Вы можете сделать это, создав новый фрейм данных.

У меня был похожий вопрос к столбцу, который нужно было хэшировать. Функция python определяется следующим образом:

def make_hash(txt):
    import hashlib
    m = hashlib.sha256()
    m.update(txt.encode())
    print ("hashed ", m)
    return m.hexdigest()  

Определил udf:

from pyspark.sql.functions import udf
u_make_hash = udf(make_hash)    

И создал новый DataFrame со всеми столбцами, кроме хэшированного столбца:

streamingOutputDF = streamingInputDF.select(u_make_hash(streamingInputDF['connectionDeviceId']).alias("Id"), streamingInputDF['*']) \
                                    .drop("connectionDeviceId")   

Я не проверял ваш udf, при условии, что все в порядке, следующее заявление должно сделать это:

dfnew = df.select((encrypt_str['FName']).alias("Encrypted_FName"))
0 голосов
/ 18 ноября 2018

Попробуйте DataFrame.columns с петлей

for col_name in df.columns:
    df = df.withColumn('Encrypted_{}'.format(col_name), encrypt_str(col(col_name)))
...