Как преобразовать строку, разделенную точкой с запятой в MapType в pyspark? - PullRequest
0 голосов
/ 09 октября 2018

Пример данных:

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|customtargeting                                                                                                                                                        |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|nocid=no;store=2007;tppid=45c566dd-00d7-4193-b5c7-17843c2764e9                                                                                                         |
|nocid=no;store=3084;tppid=4cd36fde-c59a-41d2-a2b4-b731b6cfbe05                                                                                                         |
|nocid=no;tppid=c688c1be-a9c5-47a2-8c09-aef175a19847                                                                                                                    |
|nocid=yes;search=washing liquid;store=3060                                                                                                                             |
|pos=top;tppid=278bab7b-d40b-4783-8f89-bef94a9f5150                                                                                                                     |
|pos=top;tppid=00bb87fa-f3f5-4b0e-bbf8-16079a1a5efe                                                                                                                     |
|nocid=no;shelf=cleanser-toner-and-face-mask;store=2019;tppid=84006d41-eb63-4ae1-8c3c-3ac9436d446c                                                                      |
|pos=top;tppid=ed02b037-066b-46bd-99e6-d183160644a2                                                                                                                     |
|nocid=yes;search=salad;store=3060                                                                                                                                      |
|pos=top;nocid=no;store=2882;tppid=164563e4-8e5c-4366-a5a8-438ffb10da9d                                                                                                 |
|nocid=yes;search=beer;store=3060                                                                                                                                       |
|nocid=no;search=washing capsules;store=5528;tppid=4f9b99eb-65ff-4fbc-b11c-b0552b7f158d                                                                                 |
|pos=right;tppid=ddb54247-a5c9-40a0-9f99-8412d8542b4c                                                                                                                   |
|nocid=yes;search=bedding;store=3060                                                                                                                                    |
|pos=top                                                                                                                                                                |
|pos=mpu1;keywords=helium canisters;keywords=tesco.com;keywords=helium canisters reviews;keywords=tesco;keywords=helium canisters uk;keywords=balloons;pagetype=category|

Я хочу преобразовать столбец данных PySpark в тип карты, столбец может содержать любое количество пар ключ-значение, а тип столбца - строка, а для некоторых ключейЕсть несколько значений, которые я хочу преобразовать в массив в качестве значения для ключа.

Ответы [ 2 ]

0 голосов
/ 09 октября 2018

Если вы хотите разделить столбцы и создать новый фрейм данных, вы можете использовать функции pandas.Найдите мое решение ниже

>>> import pandas as pd
>>> 
>>> rdd = sc.textFile('/home/ali/text1.txt')
>>> rdd.first()
'nocid=no;store=2007;tppid=45c566dd-00d7-4193-b5c7-17843c2764e9'
>>> rddMap = rdd.map(lambda x: x.split(';'))
>>> rddMap.first()
['nocid=no', 'store=2007', 'tppid=45c566dd-00d7-4193-b5c7-17843c2764e9']
>>> 
>>> df1 = pd.DataFrame()
>>> for rdd in rddMap.collect():
...     a = {i.split('=')[0]:i.split('=')[1] for i in rdd}
...     df2 = pd.DataFrame([a], columns=a.keys())
...     df1 = pd.concat([df1, df2])
... 
>>> df = spark.createDataFrame(df1.astype(str)).replace('nan',None)
>>> df.show()
+--------+-----+--------+-----+----------------+--------------------+-----+--------------------+
|keywords|nocid|pagetype|  pos|          search|               shelf|store|               tppid|
+--------+-----+--------+-----+----------------+--------------------+-----+--------------------+
|    null|   no|    null| null|            null|                null| 2007|45c566dd-00d7-419...|
|    null|   no|    null| null|            null|                null| 3084|4cd36fde-c59a-41d...|
|    null|   no|    null| null|            null|                null| null|c688c1be-a9c5-47a...|
|    null|  yes|    null| null|  washing liquid|                null| 3060|                null|
|    null| null|    null|  top|            null|                null| null|278bab7b-d40b-478...|
|    null| null|    null|  top|            null|                null| null|00bb87fa-f3f5-4b0...|
|    null|   no|    null| null|            null|cleanser-toner-an...| 2019|84006d41-eb63-4ae...|
|    null| null|    null|  top|            null|                null| null|ed02b037-066b-46b...|
|    null|  yes|    null| null|           salad|                null| 3060|                null|
|    null|   no|    null|  top|            null|                null| 2882|164563e4-8e5c-436...|
|    null|  yes|    null| null|            beer|                null| 3060|                null|
|    null|   no|    null| null|washing capsules|                null| 5528|4f9b99eb-65ff-4fb...|
|    null| null|    null|right|            null|                null| null|ddb54247-a5c9-40a...|
|    null|  yes|    null| null|         bedding|                null| 3060|                null|
|    null| null|    null|  top|            null|                null| null|                null|
|balloons| null|category| mpu1|            null|                null| null|                null|
+--------+-----+--------+-----+----------------+--------------------+-----+--------------------+
0 голосов
/ 09 октября 2018

Попробуйте это,

import pyspark.sql.functions as F
from pyspark.sql.types import *

def convert_to_json(_str):
    _split_str = [tuple(x.split('=')) for x in _str.split(';') if len(tuple(x.split('='))) == 2]
    _json = {}
    for k,v in _split_str:
        if k in _json:
            _json[k].append(v)
        else:
            _json[k] = [v]

    return _json

convert_udf = F.udf(convert_to_json, MapType(StringType(),ArrayType(StringType())))
df = df.withColumn('customtargeting', convert_udf('customtargeting'))

print df.schema
print df.limit(5).collect()

Это дает вам схему и вывод как,

StructType(List(StructField(
customtargeting,MapType(StringType,ArrayType(StringType,true),true),true)))

[Row(customtargeting={u'store': [u'2007'], u'tppid': [u'45c566dd-00d7-4193-b5c7-17843c2764e9'], u'nocid': [u'no']}), 
 Row(customtargeting={u'store': [u'3084'], u'tppid': [u'4cd36fde-c59a-41d2-a2b4-b731b6cfbe05'], u'nocid': [u'no']}), 
 Row(customtargeting={u'nocid': [u'no'], u'tppid': [u'c688c1be-a9c5-47a2-8c09-aef175a19847']}), 
 Row(customtargeting={u'search': [u'washing liquid'], u'nocid': [u'yes'], u'store': [u'3060']}), 
 Row(customtargeting={u'pos': [u'top'], u'tppid': [u'278bab7b-d40b-4783-8f89-bef94a9f5150']})]
...