Spark - преобразовать массив JSON Strings в массив Struct, отфильтровать и объединить с root - PullRequest
1 голос
/ 17 февраля 2020

Я совершенно новичок в Spark и пишу конвейер для выполнения некоторых преобразований в список проверок.

Пример моих данных:

{
    "id": 932522712299,
    "ticket_id": 12,
    "created_at": "2020-02-14T19:05:16Z",
    "author_id": 392401450482,
    "events": ["{\"id\": 932522713292, \"type\": \"VoiceComment\", \"public\": false, \"data\": {\"from\": \"11987654321\", \"to\": \"+1987644\"}"],
}

Моя схема в основном :

root
 |-- id: long (nullable = true)
 |-- ticket_id: long (nullable = true)
 |-- created_at: string (nullable = true)
 |-- author_id: long (nullable = true)
 |-- events: array (nullable = true)
 |    |-- element: string (containsNull = true)

Мои преобразования состоят из нескольких шагов:

  • Разделение событий по типу: комментарии, теги, изменение или обновление;

  • Для каждого найденного события я должен добавить ticket_id, author_id и creation_at из корня;

  • Он должен иметь один выход для каждого типа события.

По сути, каждый объект в массиве события представляет собой строку JSON, потому что каждый тип имеет свою структуру - единственный общий атрибут между ними - это type.

Я достиг своих целей, выполняя некоторые ужасная работа, преобразовав мой dataframe в dict, используя следующий код:

audits = list(map(lambda row: row.asDict(), df.collect()))`
comments = []
for audit in audits:
    base_info = {'ticket_id': audit['ticket_id'], 'created_at': audit['created_at'], 'author_id': audit['author_id']}
    audit['events'] = [json.loads(x) for x in audit['events']]

    audit_comments = [
        {**x, **base_info}
        for x in audit['events']
        if x['type'] == "Comment" or x['type'] == "VoiceComment"
    ]
    comments.extend(audit_comments)

Возможно, этот вопрос звучит глупо или лениво, но я действительно застрял в простых вещах, таких как:

  • how проанализировать элементы 'событий' для структурирования?
  • как выбрать событие по типу и добавить информацию из root? Может быть, использовать синтаксис выбора?

Любая помощь приветствуется.

Ответы [ 2 ]

2 голосов
/ 17 февраля 2020

Поскольку элементы массива events не имеют одинаковую структуру для всех строк, вы можете преобразовать их в Map(String, String).

, используя from_json функция и схема MapType(StringType(), StringType()):

df = df.withColumn("events", explode("events"))\
       .withColumn("events", from_json(col("events"), MapType(StringType(), StringType())))

Затем, используя element_at (Spark 2.4+), вы можете получить type следующим образом:

df = df.withColumn("event_type", element_at(col("events"), "type"))

df.printSchema()

#root
 #|-- author_id: long (nullable = true)
 #|-- created_at: string (nullable = true)
 #|-- events: map (nullable = true)
 #|    |-- key: string
 #|    |-- value: string (valueContainsNull = true)
 #|-- id: long (nullable = true)
 #|-- ticket_id: long (nullable = true)
 #|-- event_type: string (nullable = true)

Теперь вы можете фильтровать и выбирать обычные столбцы:

df.filter(col("event_type") == lit("VoiceComment")).show(truncate=False)

#+------------+--------------------+-----------------------------------------------------------------------------------------------------------+------------+---------+------------+
#|author_id   |created_at          |events                                                                                                     |id          |ticket_id|event_type  |
#+------------+--------------------+-----------------------------------------------------------------------------------------------------------+------------+---------+------------+
#|392401450482|2020-02-14T19:05:16Z|[id -> 932522713292, type -> VoiceComment, public -> false, data -> {"from":"11987654321","to":"+1987644"}]|932522712299|12       |VoiceComment|
#+------------+--------------------+-----------------------------------------------------------------------------------------------------------+------------+---------+------------+
1 голос
/ 17 февраля 2020

Ваш код загрузит полные данные о событиях на главный узел, который отправил задание. Исключительный способ обработки данных требует, чтобы вы создали карту для сокращения заданий. Для этого существует несколько API - они создают план DAG для работы, и план проявляется только при вызове определенных c функций, таких как head or show. Такая работа будет распространяться на все машины в кластере.

При работе с API-фреймами данных многое можно сделать с помощью pyspark.sql.functions

Ниже тех же преобразований с помощью spark. sql dataframe api

import pyspark.sql.functions as F

df = df.withColumn('event', F.explode(df.events)).drop(df.events)
df = df.withColumn('event', F.from_json(df.event, 'STRUCT <id: INT, type: STRING, public: Boolean, data: STRUCT<from: STRING, to: STRING>>'))
events = df.where('event.type = "Comment" OR event.type == "VoiceComment"')

events.printSchema()
events.head(100)

Когда данные не могут быть обработаны с помощью выражений sql, вы можете реализовать простую пользовательскую функцию - UDF или Pandas UDF

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...