Разбор столбца из формата json в pyspark - PullRequest
0 голосов
/ 03 февраля 2020

У меня есть фрейм данных, где один из столбцов имеет формат json, как показано ниже:

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- row_number: integer (nullable = true)
 |-- json: struct (nullable = true)
 |    |-- @timestamp: string (nullable = true)
 |    |-- @version: string (nullable = true)
 |    |-- beat: struct (nullable = true)
 |    |    |-- hostname: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- version: string (nullable = true)
 |    |-- host: struct (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |-- input: struct (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |-- message: string (nullable = true)
 |    |-- offset: long (nullable = true)
 |    |-- prospector: struct (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |-- source: string (nullable = true)
 |    |-- station: string (nullable = true)
 |    |-- tags: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- timestamp: string (nullable = true)

Мне интересен столбец message в формате json, который очень большая строка из бревен. Столбец сообщения имеет собственную схему, которую я проанализировал с помощью pandas. Например, у него есть такие элементы, как #columns, #rows, #blocked, #rule, #process_duration et c, перед которыми #, которые я идентифицирую как части схемы. Используя pandas, я смог разделить их и упорядочить в удобочитаемые столбцы, но мне нужно сделать это для очень большого набора данных, и мой экземпляр EC2 продолжает выделяться. В любом случае, я не думаю, что будет эффективно преобразовать фрейм данных искры в pandas и затем снова запустить спарк. Мой код python pandas был таким:

y = json.loads(x[0])
col_names = pand.read_csv(StringIO(y["message"]), nrows = 1, skiprows=[0, 1], sep =' ', header=None,quotechar='\'', index_col=False)
dispatch_list=pand.read_csv(StringIO(y["message"]), sep=' ', header=None,
                                         comment='#', skiprows=[1], quotechar='\'',
                                         names = col_names.iloc[0 , 2:].tolist(),
                                         index_col=False)

Я в основном хочу сделать то, что делает мой код pandas, но с использованием pyspark. Это возможно? Любая помощь будет принята с благодарностью. Спасибо

Обновление с примерами данных в столбце сообщения:

#headers 
'STATION: Harlem     MOVEABLE LOTS:  5 QTY:    0     HELD LOTS:   0 QTY:    0     BLOCKED LOTS:   0 QTY:    0'
#columns  'TargetEQP' 'GRank' 'LRank' 'Priority' 'Lot ID' 'Carrier ID' 'Auto Reason This Eqp' 'Full Auto Reason Target Eqp' 'Carrier Eqp ID' 'Carrier Station ID' 'Carrier Xfer Status' 'Reservation' 'Qty' 'Cond Type' 'PD ID' 'Oper Name' 'Oper No' 'Lot Processing State' 'Product ID' 'Machine Recipe ID' 'Ch_Comb_This_Eqp' 'Ch_Avail_This_Eqp'  'Multi Lot Type' 'Hold State' 'Route ID' 'Process RunSize Maximum' 'Sub Lot Type' 'Age' 'WI' 'SS' 'Time to Q Expr' 'Rush' 'Due Date' 'Due Date Source' 'Allowed Tools' 'Full Auto Reason This Eqp' 'TargetStocker' 'TargetPort' 'Processing Time' 'Addl Rework' 'Asset Owner' 'pass_count'
#widths  12 3 12 12 6 8 12 12 12 12 2 12 3 12 5 12 6 12 12 1 12 12 12 12 12 3 6 12 1 6 12 3 12 12 3 12 12 12 12 12 12 12 12 12 1
#types 'STRING' 'INTEGER' 'INTEGER' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'INTEGER' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'INTEGER' 'STRING' 'STRING' 'REAL' 'REAL' 'STRING' 'INTEGER' 'INSTANT' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'STRING' 'INTEGER' 
#lotcol -1
#rows
'Eqp1' '1' '1' 'Slow' 'LOT ID1' '6C20' 'N: QualEqp,EIProblem,NoAutoPorts' 'N: QualEqp(QualEqpId script parm is Eqp2),EIProblem(Eqp not in sync:  NOTINSYNCH),NoAutoPorts(Eqp has no online nor offline ports)' '' 'ZFG1' 'SI' '' '1' '' 'Recipe1' 'Recipe_Stem_1' '1000.1000' 'Waiting' 'Recipe_Stem_2' '' 'Recipe 2' 'Logical Recipe 2' '' '' '' 'ML-MR' 'NOTONHOLD' 'Recipe_Core' '1' 'Equipment Monitor' '0hr 51m' '1.3' '-153.1' '' '8' '12/15/18 02:41:42' 'Blocked PRD WIP' 'Block:Eqp1' 'N: QualEqp(QualEqpId script parm is Eqp2),EIProblem(Eqp not in sync:  NOTINSYNCH),NoAutoPorts(Eqp has no online nor offline ports)' '' '' '0' '' '' '1' 
#blocked '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' 
#rule 'NofM'
#station 'Harlem'
#station_type 'STATION'
#queue_duration '1.50204'
#process_duration '11.431'

Здесь элементы в объекте #column должны соответствовать элементам в объекте #rows. Например, столбец TargetEQP должен иметь Eqp1 под ним, а столбец Priority должен иметь Slow под ним и т. Д.

...