pyspark генерирует хэш строки определенных столбцов и добавляет его в качестве нового столбца - PullRequest
0 голосов
/ 12 сентября 2018

Я работаю со свечами 2.2.0 и pyspark2.

Я создал DataFrame df и теперь пытаюсь добавить новый столбец "rowhash", который является хэшем sha2 определенных столбцов в DataFrame.

Например, скажем, что df имеет столбцы: (column1, column2, ..., column10)

Мне требуется sha2((column2||column3||column4||...... column8), 256) в новом столбце "rowhash".

Пока я пытался использовать следующие методы:

1) Используется hash() функция, но, поскольку она дает целочисленный вывод, она не очень полезна

2) Попытка с использованием функции sha2(), но она не работает.

Скажите, columnarray имеет массив столбцов, которые мне нужны.

def concat(columnarray):
    concat_str = ''
    for val in columnarray:
        concat_str = concat_str + '||' + str(val) 
    concat_str = concat_str[2:] 
    return concat_str 

, а затем

df1 = df1.withColumn("row_sha2", sha2(concat(columnarray),256))

Это ошибка с ошибкой «не удается разрешить».

Спасибо, gaw за ваш ответ. Поскольку мне нужно хэшировать только определенные столбцы, я создал список имен этих столбцов (в hash_col) и изменил вашу функцию следующим образом:

 def sha_concat(row, columnarray):
   row_dict = row.asDict()      #transform row to a dict
   concat_str = '' 
   for v in columnarray: 
       concat_str = concat_str + '||' + str(row_dict.get(v)) 
   concat_str = concat_str[2:] 
   #preserve concatenated value for testing (this can be removed later)
   row_dict["sha_values"] = concat_str  
   row_dict["sha_hash"] = hashlib.sha256(concat_str).hexdigest()
   return Row(**row_dict) 

Затем передается как:

    df1.rdd.map(lambda row: sha_concat(row,hash_col)).toDF().show(truncate=False)

Однако теперь происходит сбой с ошибкой:

    UnicodeEncodeError: 'ascii' codec can't encode character u'\ufffd' in position 8: ordinal not in range(128)

Я вижу значение \ ufffd в одном из столбцов, поэтому я не уверен, есть ли способ справиться с этим?

Ответы [ 2 ]

0 голосов
/ 12 сентября 2018

Вы можете использовать pyspark.sql.functions.concat_ws() для объединения ваших столбцов и pyspark.sql.functions.sha2() для получения хэша SHA256.

Использование данных @gaw:

from pyspark.sql.functions import sha2, concat_ws
df = spark.createDataFrame(
    [(1,"2",5,1),(3,"4",7,8)],
    ("col1","col2","col3","col4")
)
df.withColumn("row_sha2", sha2(concat_ws("||", *df.columns), 256)).show(truncate=False)
#+----+----+----+----+----------------------------------------------------------------+
#|col1|col2|col3|col4|row_sha2                                                        |
#+----+----+----+----+----------------------------------------------------------------+
#|1   |2   |5   |1   |1b0ae4beb8ce031cf585e9bb79df7d32c3b93c8c73c27d8f2c2ddc2de9c8edcd|
#|3   |4   |7   |8   |57f057bdc4178b69b1b6ab9d78eabee47133790cba8cf503ac1658fa7a496db1|
#+----+----+----+----+----------------------------------------------------------------+

Вы можете передать 0 или 256 в качестве второго аргумента sha2(), в соответствии с документацией:

Возвращает результат шестнадцатеричной строки семейства хэш-функций SHA-2 (SHA-224, SHA-256, SHA-384 и SHA-512). NumBits указывает желаемую битовую длину результата, которая должна иметь значение 224, 256, 384, 512 или 0 (что эквивалентно 256).

Функция concat_ws принимает разделитель и список столбцов для объединения. Я передаю || как разделитель и df.columns как список столбцов.

Я использую все столбцы здесь, но вы можете указать любое подмножество столбцов, которое вы хотите - в вашем случае это будет columnarray. (Вам нужно использовать *, чтобы распаковать список.)

0 голосов
/ 12 сентября 2018

Если вы хотите иметь хеш для каждого значения в разных столбцах вашего набора данных, вы можете применить самодельную функцию через map к rdd вашего фрейма данных.

import hashlib
test_df = spark.createDataFrame([
    (1,"2",5,1),(3,"4",7,8),              
    ], ("col1","col2","col3","col4"))

def sha_concat(row):
    row_dict = row.asDict()                             #transform row to a dict
    columnarray = row_dict.keys()                       #get the column names
    concat_str = ''
    for v in row_dict.values():
        concat_str = concat_str + '||' + str(v)         #concatenate values
    concat_str = concat_str[2:] 
    row_dict["sha_values"] = concat_str                 #preserve concatenated value for testing (this can be removed later)
    row_dict["sha_hash"] = hashlib.sha256(concat_str).hexdigest() #calculate sha256
    return Row(**row_dict)

test_df.rdd.map(sha_concat).toDF().show(truncate=False)

Результаты будут выглядеть так:

+----+----+----+----+----------------------------------------------------------------+----------+
|col1|col2|col3|col4|sha_hash                                                        |sha_values|
+----+----+----+----+----------------------------------------------------------------+----------+
|1   |2   |5   |1   |1b0ae4beb8ce031cf585e9bb79df7d32c3b93c8c73c27d8f2c2ddc2de9c8edcd|1||2||5||1|
|3   |4   |7   |8   |cb8f8c5d9fd7165cf3c0f019e0fb10fa0e8f147960c715b7f6a60e149d3923a5|8||4||7||3|
+----+----+----+----+----------------------------------------------------------------+----------+
...