Я совершенно новичок в Spark и пишу конвейер для выполнения некоторых преобразований в список проверок.
Пример моих данных:
{
"id": 932522712299,
"ticket_id": 12,
"created_at": "2020-02-14T19:05:16Z",
"author_id": 392401450482,
"events": ["{\"id\": 932522713292, \"type\": \"VoiceComment\", \"public\": false, \"data\": {\"from\": \"11987654321\", \"to\": \"+1987644\"}"],
}
Моя схема в основном :
root
|-- id: long (nullable = true)
|-- ticket_id: long (nullable = true)
|-- created_at: string (nullable = true)
|-- author_id: long (nullable = true)
|-- events: array (nullable = true)
| |-- element: string (containsNull = true)
Мои преобразования состоят из нескольких шагов:
Разделение событий по типу: комментарии, теги, изменение или обновление;
Для каждого найденного события я должен добавить ticket_id, author_id и creation_at из корня;
Он должен иметь один выход для каждого типа события.
По сути, каждый объект в массиве события представляет собой строку JSON, потому что каждый тип имеет свою структуру - единственный общий атрибут между ними - это type
.
Я достиг своих целей, выполняя некоторые ужасная работа, преобразовав мой dataframe в dict, используя следующий код:
audits = list(map(lambda row: row.asDict(), df.collect()))`
comments = []
for audit in audits:
base_info = {'ticket_id': audit['ticket_id'], 'created_at': audit['created_at'], 'author_id': audit['author_id']}
audit['events'] = [json.loads(x) for x in audit['events']]
audit_comments = [
{**x, **base_info}
for x in audit['events']
if x['type'] == "Comment" or x['type'] == "VoiceComment"
]
comments.extend(audit_comments)
Возможно, этот вопрос звучит глупо или лениво, но я действительно застрял в простых вещах, таких как:
- how проанализировать элементы 'событий' для структурирования?
- как выбрать событие по типу и добавить информацию из root? Может быть, использовать синтаксис выбора?
Любая помощь приветствуется.