У меня есть JSON со структурой и "массивом типа структуры данных", я могу автоматически проанализировать JSON для типа структуры. Для массива типа структуры мне нужно обработать массив структуры, для которого мой код не работает. У меня есть подход, я не уверен, где вписаться в код. Завершение с ошибкой
Ниже мой JSON
{
"Details": {
"ReferenceDetails": "6CK",
"Number": "77",
"Reason": 1,
"port1": "Nav",
"port2": "Sri",
"Disrupted": "2008-02-08T00:00:00Z"
},
"CD": {
"FirstName": "Sam",
"LastName": "HE",
"Country": "IN",
"County": "Indi",
"Postcode": "12",
"Address1": "India",
"Address2": "",
"Town": "Indi",
"EMail": "aro@in.com",
"TelephoneCountryCode": "+1",
"TelephoneRgn": "965895",
"Telephone": "65895",
"Agent": "",
"IsAgent": true,
"IdB": true,
"IsYes": true
},
"Passengers": [{
"FirstName": "D",
"LastName": "Sunil"
}],
"IpAddress": "123223",
"Type": "IND",
"Language": "en",
"IsValidated": true,
"CF": "01",
"TimeStamp": "2018-12-18 05:19:45.998",
"Code": "INS"
}
Я попытался изменить тип данных с помощью регулярных выражений, изменив arrat <на пусто и завершив ">>" на ">". Причина для этого у меня есть код, который дифференцирует тип структуры. Теперь я хотел, чтобы массив struct был struct
Контроль над типами данных осуществляется df.dtypes, зацикливающимся на этих dtypes. Внизу раздела есть код
Ниже был опробован подход:
Для массива типа структуры я использую опцию взрыва, и она работает нормально. Но я хотел без опции разнесения, так как разнесение - это обработка, установив контроль над именем столбца
Ниже приведен фрагмент для обработки массива структуры с разнесением
flat_df = flat_df.withColumn("Passenger_FirstName", explode(flat_df.Passengers.FirstName)).withColumn("Passenger_LastName", explode(flat_df.Passengers.LastName)).drop("Passengers")
Ниже приведен код для чтения json и сглаживания, который я пробовал для массива struct в автоматическом фейшировании, заменив "array <" на "" и ">>" на
">". Решение для этого было бы очень очень полезно
read_json=spark.read.option("charset", "UTF-8").option("multiline", "false").json(sc.wholeTextFiles("s3a://folder1/folder2/",minPartitions=32).values())
flat_cols = [c[0] for c in read_json.dtypes if c[1][:6] != 'struct']
nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
flat_df = read_json.select(flat_cols + [col(nc+'.'+c).alias(nc+'_'+c) for nc in nested_cols for c in read_json.select(nc+'.*').columns])
for c in read_json.dtypes:
re.sub('array<','',read_json.dtypes)
re.sub('>>','>',read_json.dtypes)
Ошибка, с которой я столкнулся ниже:
Traceback (последний вызов был последним):
Файл "", строка 2, в
Файл "/opt/cloudera/parcels/Anaconda-5.0.1/lib/python2.7/re.py", строка 155, в sub
вернуть _compile (шаблон, флаги) .sub (repl, string, count)
Ошибка типа: ожидаемая строка или буфер
Я ожидаю, что все мои вложенные столбцы должны быть сведены в одном кадре данных. Я не хочу трогать имена атрибутов