Обработка событий из концентратора событий с помощью pyspark - Databricks - PullRequest
0 голосов
/ 12 января 2019

У меня есть поток изменений Mongo (приложение pymongo), который постоянно получает изменения в коллекциях. Эти документы об изменениях, полученные программой, отправляются в концентраторы событий Azure. Записная книжка Spark должна читать документы, когда они попадают в концентратор событий, и сопоставлять схемы (сопоставлять поля в документе со столбцами таблиц искр) с таблицей искр для этой коллекции. Если в документе меньше полей, чем в таблице, столбцы должны быть добавлены с нулем.

Я читаю события из Event Hub, как показано ниже.

spark.readStream.format("eventhubs").option(**config).load().

Как сказано в документации, исходное сообщение находится в столбце «body» информационного кадра, который я преобразовываю в строку. Теперь у меня есть документ Mongo в виде строки JSON в потоковом фрейме данных. У меня проблемы ниже.

Мне нужно извлечь отдельные поля в документе монго. Это необходимо для сравнения того, какие поля присутствуют в таблице spark, а какие нет в документе Mongo. Я видел функцию с именем get_json_object (col, path). По сути, это снова возвращает строку, и я не могу по отдельности выбрать все столбцы.

Если можно использовать from_json для преобразования строки JSON в тип Struct, я не могу указать схему, потому что у нас есть около 70 коллекций (соответствующее количество таблиц искр), каждая из которых отправляет документы Mongo с полями от 10 до 450.

Если я могу преобразовать строку JSON в потоковом фрейме данных в объект JSON, схема которого может быть выведена из фрейма данных (что-то вроде того, что может сделать read.json), я могу использовать представление SQL * для извлечения отдельных столбцов, сделайте несколько манипуляции, а затем сохранить окончательный кадр данных в таблицу искры. Возможно ли это сделать? Какую ошибку я совершаю?

Примечание: Stram DF не поддерживает метод collect () для отдельного извлечения строки JSON из базового rdd и выполнения необходимых сравнений столбцов. Использование Spark 2.4 и Python в среде Azure Databricks 4.3.

Ниже приведен пример данных, которые я получаю в своей записной книжке после чтения событий из концентратора событий и приведения их к строке.

{
  "documentKey": "5ab2cbd747f8b2e33e1f5527",
  "collection": "configurations",
  "operationType": "replace",
  "fullDocument": {
    "_id": "5ab2cbd747f8b2e33e1f5527",
    "app": "7NOW",
    "type": "global",
    "version": "1.0",
    "country": "US",
    "created_date": "2018-02-14T18:34:13.376Z",
    "created_by": "Vikram SSS",
    "last_modified_date": "2018-07-01T04:00:00.000Z",
    "last_modified_by": "Vikram Ganta",
    "last_modified_comments": "Added new property in show_banners feature",
    "is_active": true,
    "configurations": [
      {
        "feature": "tip",
        "properties": [
          {
            "id": "tip_mode",
            "name": "Delivery Tip Mode",
            "description": "Tip mode switches the display of tip options between percentage and amount in the customer app",
            "options": [
              "amount",
              "percentage"
            ],
            "default_value": "tip_percentage",
            "current_value": "tip_percentage",
            "mode": "multiple or single"
          },
          {
            "id": "tip_amount",
            "name": "Tip Amounts",
            "description": "List of possible tip amount values",
            "default_value": 0,
            "options": [
              {
                "display": "No Tip",
                "value": 0
              }
            ]
          }
        ]
      }
    ]
  }
}

Я бы хотел отделить и вынуть full_document в примере выше. Когда я использую get_json_object, я получаю full_document в другом потоковом фрейме данных как строку JSON, а не как объект. Как вы можете видеть, в full_document есть несколько типов массивов, которые я могу раскрыть (в документации сказано, что explode поддерживается в потоковом DF, но еще не пробовал), но есть также некоторые объекты (например, тип структуры), которые я хотел бы извлечь для отдельных лиц. поля. Я не могу использовать нотацию SQL '*', потому что get_json_object возвращает строку, а не сам объект.

1 Ответ

0 голосов
/ 17 января 2019

Убедительно, что эта сильно различающаяся Схема JSON была бы лучше, если бы схема упоминалась явно. Таким образом, я понял, что в потоковой среде с очень разной схемой входящего потока всегда лучше указывать схему. Итак, я продолжаю с get_json_object и from_json и читаю схему через файл.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...