У меня есть образец кода pyspark, в котором я пытаюсь сгенерировать структуру json. Ниже приведен код
def func(row):
temp=row.asDict()
headDict = {}
headDict['type'] = "record"
headDict['name'] = "source"
headDict['namespace'] = "com.streaming.event"
headDict['doc'] = "SCD signals from source"
fieldslist = []
headDict['fields'] = fieldslist
for i in temp:
fieldslist.append({i:temp[i]})
return (json.dumps(headDict))
if __name__ == "__main__":
spark = SparkSession.builder.master("local[*]").appName("PythonWordCount").getOrCreate()
payload=udf(func,StringType())
data = spark.createDataFrame(
[
(1, "a", 'foo1'), # create your data here, be consistent in the types.
(2, "b", 'bar'),
(3, "c", 'mnc')
],
['id', 'nm', 'txt'] # add your columns label here
)
df=data.withColumn("payload1",payload(struct([data[x] for x in data.columns])))
df.show(3,False)
Я получаю сообщение об ошибке при вставке данных в фрейм данных
raise ValueError("Unexpected tuple %r with StructType" % obj)
ValueError: Unexpected tuple '{"namespace": "com.streaming.event", "type": "record", "name": "source", "fields": [{"txt": "mnc"}, {"id": 3}, {"nm": "c"}], "doc": "SCD signals from source"}' with StructType
Если я пытаюсь распечатать полезную нагрузку json, я получаю правильный вывод
{"namespace": "com.streaming.event", "type": "record", "name": "source", "fields": [{"txt": "mnc"}, {"id": 3}, {"nm": "c"}], "doc": "SCD signals from source"}
Я также подтвердил, что это действительный json.
Я не уверен, что мне здесь не хватает.
Возможно, это проблема версии python ? Я использую python 2.7
Update - я пытался запустить тот же код, используя python 3.7, и он работает нормально