Pyspark обрабатывает массив структурного типа в автоматическом режиме - PullRequest
0 голосов
/ 07 января 2019

У меня есть JSON со структурой и "массивом типа структуры данных", я могу автоматически проанализировать JSON для типа структуры. Для массива типа структуры мне нужно обработать массив структуры, для которого мой код не работает. У меня есть подход, я не уверен, где вписаться в код. Завершение с ошибкой

Ниже мой JSON

{
"Details": {
    "ReferenceDetails": "6CK",
    "Number": "77",
    "Reason": 1,
    "port1": "Nav",
    "port2": "Sri",
    "Disrupted": "2008-02-08T00:00:00Z"
},
"CD": {
    "FirstName": "Sam",
    "LastName": "HE",
    "Country": "IN",
    "County": "Indi",
    "Postcode": "12",
    "Address1": "India",
    "Address2": "",
    "Town": "Indi",
    "EMail": "aro@in.com",
    "TelephoneCountryCode": "+1",
    "TelephoneRgn": "965895",
    "Telephone": "65895",
    "Agent": "",
    "IsAgent": true,
    "IdB": true,
    "IsYes": true
},
"Passengers": [{
    "FirstName": "D",
    "LastName": "Sunil"
}],
"IpAddress": "123223",
"Type": "IND",
"Language": "en",
"IsValidated": true,
"CF": "01",
"TimeStamp": "2018-12-18 05:19:45.998",
"Code": "INS"
}

Я попытался изменить тип данных с помощью регулярных выражений, изменив arrat <на пусто и завершив ">>" на ">". Причина для этого у меня есть код, который дифференцирует тип структуры. Теперь я хотел, чтобы массив struct был struct

Контроль над типами данных осуществляется df.dtypes, зацикливающимся на этих dtypes. Внизу раздела есть код

Ниже был опробован подход: Для массива типа структуры я использую опцию взрыва, и она работает нормально. Но я хотел без опции разнесения, так как разнесение - это обработка, установив контроль над именем столбца

Ниже приведен фрагмент для обработки массива структуры с разнесением

flat_df = flat_df.withColumn("Passenger_FirstName", explode(flat_df.Passengers.FirstName)).withColumn("Passenger_LastName", explode(flat_df.Passengers.LastName)).drop("Passengers")

Ниже приведен код для чтения json и сглаживания, который я пробовал для массива struct в автоматическом фейшировании, заменив "array <" на "" и ">>" на ">". Решение для этого было бы очень очень полезно

read_json=spark.read.option("charset", "UTF-8").option("multiline", "false").json(sc.wholeTextFiles("s3a://folder1/folder2/",minPartitions=32).values())

flat_cols = [c[0] for c in read_json.dtypes if c[1][:6] != 'struct']
nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
flat_df = read_json.select(flat_cols + [col(nc+'.'+c).alias(nc+'_'+c) for nc in nested_cols for c in read_json.select(nc+'.*').columns])

for c in read_json.dtypes:
     re.sub('array<','',read_json.dtypes)
     re.sub('>>','>',read_json.dtypes)

Ошибка, с которой я столкнулся ниже:

Traceback (последний вызов был последним): Файл "", строка 2, в Файл "/opt/cloudera/parcels/Anaconda-5.0.1/lib/python2.7/re.py", строка 155, в sub вернуть _compile (шаблон, флаги) .sub (repl, string, count) Ошибка типа: ожидаемая строка или буфер

Я ожидаю, что все мои вложенные столбцы должны быть сведены в одном кадре данных. Я не хочу трогать имена атрибутов

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...