Попробуйте это,
import pyspark.sql.functions as F
from pyspark.sql.types import *
def convert_to_json(_str):
_split_str = [tuple(x.split('=')) for x in _str.split(';') if len(tuple(x.split('='))) == 2]
_json = {}
for k,v in _split_str:
if k in _json:
_json[k].append(v)
else:
_json[k] = [v]
return _json
convert_udf = F.udf(convert_to_json, MapType(StringType(),ArrayType(StringType())))
df = df.withColumn('customtargeting', convert_udf('customtargeting'))
print df.schema
print df.limit(5).collect()
Это дает вам схему и вывод как,
StructType(List(StructField(
customtargeting,MapType(StringType,ArrayType(StringType,true),true),true)))
[Row(customtargeting={u'store': [u'2007'], u'tppid': [u'45c566dd-00d7-4193-b5c7-17843c2764e9'], u'nocid': [u'no']}),
Row(customtargeting={u'store': [u'3084'], u'tppid': [u'4cd36fde-c59a-41d2-a2b4-b731b6cfbe05'], u'nocid': [u'no']}),
Row(customtargeting={u'nocid': [u'no'], u'tppid': [u'c688c1be-a9c5-47a2-8c09-aef175a19847']}),
Row(customtargeting={u'search': [u'washing liquid'], u'nocid': [u'yes'], u'store': [u'3060']}),
Row(customtargeting={u'pos': [u'top'], u'tppid': [u'278bab7b-d40b-4783-8f89-bef94a9f5150']})]