Я пытаюсь создать схему искры, которая нужна для расширения возможностей при создании кадра данных. Я могу сгенерировать схему json, используя ниже
from pyspark.sql.types import StructType
# Save schema from the original DataFrame into json:
schema_json = df.schema.json()
, что дает мне
{"fields":[{"metadata":{},"name":"cloud_events_version","nullable":true,"type":"string"},{"metadata":{},"name":"data","nullable":true,"type":{"fields":[{"metadata":{},"name":"email","nullable":true,"type":"string"},{"metadata":{},"name":"member_role","nullable":true,"type":"string"},{"metadata":{},"name":"reg_source_product","nullable":true,"type":"string"},{"metadata":{},"name":"school_type","nullable":true,"type":"string"},{"metadata":{},"name":"year_in_college","nullable":true,"type":"long"}],"type":"struct"}},{"metadata":{},"name":"event_time","nullable":true,"type":"string"},{"metadata":{},"name":"event_type","nullable":true,"type":"string"},{"metadata":{},"name":"event_type_version","nullable":true,"type":"string"},{"metadata":{},"name":"event_validated_ts","nullable":true,"type":"string"},{"metadata":{},"name":"event_validation_status","nullable":true,"type":"string"},{"metadata":{},"name":"extensions","nullable":true,"type":{"fields":[{"metadata":{},"name":"client_common","nullable":true,"type":{"fields":[{"metadata":{},"name":"adobe_mcid","nullable":true,"type":"string"},{"metadata":{},"name":"adobe_sdid","nullable":true,"type":"string"},{"metadata":{},"name":"auth_state","nullable":true,"type":"string"},{"metadata":{},"name":"uuid","nullable":true,"type":"string"},{"metadata":{},"name":"client_experiments","nullable":true,"type":"string"},{"metadata":{},"name":"client_ip_address","nullable":true,"type":"string"},{"metadata":{},"name":"device_id","nullable":true,"type":"string"},{"metadata":{},"name":"page_name","nullable":true,"type":"string"},{"metadata":{},"name":"referral_url","nullable":true,"type":"string"},{"metadata":{},"name":"url","nullable":true,"type":"string"},{"metadata":{},"name":"user_agent","nullable":true,"type":"string"},{"metadata":{},"name":"uvn","nullable":true,"type":"string"}],"type":"struct"}}],"type":"struct"}},{"metadata":{},"name":"source","nullable":true,"type":"string"},{"metadata":{},"name":"validated_message","nullable":true,"type":"string"},{"metadata":{},"name":"year","nullable":true,"type":"integer"},{"metadata":{},"name":"mon","nullable":true,"type":"integer"},{"metadata":{},"name":"day","nullable":true,"type":"integer"},{"metadata":{},"name":"hour","nullable":true,"type":"integer"}],"type":"struct"}
Нодля этого мне нужно проанализировать фрейм данных, что занимает немного времени, и я пытаюсь избежать
Одна вещь, которую я могу сделать, - это извлечь нужную схему из каталога, который у нас есть внутри.Что дает что-то вроде
[{u'Name': u'cloud_events_version', u'Type': u'string'},
{u'Name': u'event_type', u'Type': u'string'},
{u'Name': u'event_time', u'Type': u'string'},
{u'Name': u'data', u'Type': u'struct<school_type:string,reg_source_product:string,member_role:string,email:string,year_in_college:int>'},
{u'Name': u'source', u'Type': u'string'},
{u'Name': u'extensions', u'Type': u'struct<client_common:struct<auth_state:string,client_ip_address:string,client_experiments:string,uvn:string,device_id:string,adobe_sdid:string,url:string,page_name:string,user_agent:string,uuid:string,adobe_mcid:string,referral_url:string>>'},
{u'Name': u'event_type_version', u'Type': u'string'},
{u'Name': u'event_validation_status', u'Type': u'string'},
{u'Name': u'event_validated_ts', u'Type': u'string'},
{u'Name': u'validated_message', u'Type': u'string'}]
Я пытаюсь написать рекурсивный запрос Python, который генерирует вышеупомянутый JSON.Логика состоит в том, чтобы циклически проходить через этот список dict и назначать имя и тип этому словарю, когда тип является строкой
{"metadata" : {},"name" : columnName,"nullable" : True,"type" : columnType}
, но когда тип является структурой, он создает список словаря всех элементовструктурируйте и присвойте его типу и делайте это рекурсивно, пока он не найдет какую-либо структуру.
Все, что я могу собрать, это
def structRecursive(columnName,columnType):
if "struct" not in columnType:
ColumnDict = {"metadata" : {},"name" : columnName,"nullable" : True,"type" : columnType}
else:
structColumnList = []
structColumnDict = {
'metadata': {},
'name': columnName,
'nullable': True,
'type': {'fields': structColumnList, 'type': 'struct'}
}
if columnType.count('struct<')==1:
structCol = columnName
structColList = columnType.encode('utf-8').replace('struct<',
'').replace('>', '').split(',')
for item in structColList:
fieldName = item.split(':')[0]
dataType = item.split(':')[1]
nodeDict = {}
nodeDict['metadata'] = {}
nodeDict['name'] = '{}'.format(fieldName)
nodeDict['nullable'] = True
nodeDict['type'] = '{}'.format(dataType)
structColumnList.append(nodeDict)
else:
columnName = columnType.replace('struct<','',1).replace('>','').split(':')[0]
columnType = columnType.split("{}:".format(columnName),1)[1].replace('>','',1)
return structColumnDict
MainStructList = []
MainStructDict = {'fields': MainStructList, 'type': 'struct'}
for item in ListOfDict :
columnName = item['Name'].encode('utf-8')
columnType = item['Type'].encode('utf-8')
MainStructList.append(structRecursive(columnName,columnType))
Конечно, это не дает желаемого результата.Хотел бы получить предложение здесь.