Как преобразовать столбец json без схемы в словарь, json или другую таблицу данных в PySpark - PullRequest
0 голосов
/ 18 марта 2020

Я работаю с PysPark, и я довольно новичок в этой области, у меня есть DataFrame (df) с 15 столбцами (например, col1, col2, col3, ... col15) и соответствующая таблица LOG для отслеживания изменений внутри T, LOG TABLE (LG) имеет 3 столбца (ID, EntityID, Changes), а столбец «изменения» содержит изменения. Например, у меня может быть строка внутри LG, например:

+----+---+-------------------------------------------------------------------+
|ID  | EntityID |        Changes                                             |
+----+---+-------------------------------------------------------------------+
|1   |2         |{"Col1" :val1 ; "Col2":val2 ; "Col12" : val12}              |
|2   |3         |{"Col7" :val7 ; "Col12":val12;"Col19":val19 ;"Col15":val15} |
|3   |3         |{"Col1" :val1 ; col2:val2; ....            ;"Col15":val15}  |
+----------------------------------------------------------------------------|

СЕЙЧАС я хочу чтобы сделать некоторые агрегации на то, что у меня есть внутри «Изменения», например, сумма (Col5), AVG (Col 10) groupBy (EntityID) и .... Я не знаю, «что» делать и «как» это сделать Каст "меняет" на Dict или json или даже на фрейм данных ?? и как это сделать ? Я прочитал о json, взорваться и ... но проблема в том, что моя колонка не является правильно сформированной json Я думаю. Было бы полезно, если бы кто-нибудь мог предложить мне решение, позволяющее мне выполнять агрегирование некоторых ключей и значений внутри строки.

1 Ответ

0 голосов
/ 08 апреля 2020

Здесь я нашел подход для этого. Прежде всего, Json нужно было проверить (у меня было что-то вроде этого {"col1": 10, "col2": 2020-04-01 16:55:12}, поэтому pyspark. sql .functions.from_ json не удалось сопоставить JSON строку с желаемой схемой. Изменение Col2 на "2020-04-01 16:55:12" решило ее.

#F is  pyspark.sql.functions
#schm in my json Schema sth like  
schm = StructType([
StructField("col1", LongType(), True),
StructField("col2", StringType(), True)])      
df.select(F.from_json(F.col("changes"),
schm
).alias("jsn")).select( 
"jsn.Col1","jsn.Col2").show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...