У меня есть поток изменений Mongo (приложение pymongo), который постоянно получает изменения в коллекциях. Эти документы об изменениях, полученные программой, отправляются в концентраторы событий Azure. Записная книжка Spark должна читать документы, когда они попадают в концентратор событий, и сопоставлять схемы (сопоставлять поля в документе со столбцами таблиц искр) с таблицей искр для этой коллекции. Если в документе меньше полей, чем в таблице, столбцы должны быть добавлены с нулем.
Я читаю события из Event Hub, как показано ниже.
spark.readStream.format("eventhubs").option(**config).load().
Как сказано в документации, исходное сообщение находится в столбце «body» информационного кадра, который я преобразовываю в строку. Теперь у меня есть документ Mongo в виде строки JSON в потоковом фрейме данных. У меня проблемы ниже.
Мне нужно извлечь отдельные поля в документе монго. Это необходимо для сравнения того, какие поля присутствуют в таблице spark, а какие нет в документе Mongo. Я видел функцию с именем get_json_object (col, path). По сути, это снова возвращает строку, и я не могу по отдельности выбрать все столбцы.
Если можно использовать from_json для преобразования строки JSON в тип Struct, я не могу указать схему, потому что у нас есть около 70 коллекций (соответствующее количество таблиц искр), каждая из которых отправляет документы Mongo с полями от 10 до 450.
Если я могу преобразовать строку JSON в потоковом фрейме данных в объект JSON, схема которого может быть выведена из фрейма данных (что-то вроде того, что может сделать read.json), я могу использовать представление SQL * для извлечения отдельных столбцов, сделайте несколько манипуляции, а затем сохранить окончательный кадр данных в таблицу искры. Возможно ли это сделать? Какую ошибку я совершаю?
Примечание: Stram DF не поддерживает метод collect () для отдельного извлечения строки JSON из базового rdd и выполнения необходимых сравнений столбцов. Использование Spark 2.4 и Python в среде Azure Databricks 4.3.
Ниже приведен пример данных, которые я получаю в своей записной книжке после чтения событий из концентратора событий и приведения их к строке.
{
"documentKey": "5ab2cbd747f8b2e33e1f5527",
"collection": "configurations",
"operationType": "replace",
"fullDocument": {
"_id": "5ab2cbd747f8b2e33e1f5527",
"app": "7NOW",
"type": "global",
"version": "1.0",
"country": "US",
"created_date": "2018-02-14T18:34:13.376Z",
"created_by": "Vikram SSS",
"last_modified_date": "2018-07-01T04:00:00.000Z",
"last_modified_by": "Vikram Ganta",
"last_modified_comments": "Added new property in show_banners feature",
"is_active": true,
"configurations": [
{
"feature": "tip",
"properties": [
{
"id": "tip_mode",
"name": "Delivery Tip Mode",
"description": "Tip mode switches the display of tip options between percentage and amount in the customer app",
"options": [
"amount",
"percentage"
],
"default_value": "tip_percentage",
"current_value": "tip_percentage",
"mode": "multiple or single"
},
{
"id": "tip_amount",
"name": "Tip Amounts",
"description": "List of possible tip amount values",
"default_value": 0,
"options": [
{
"display": "No Tip",
"value": 0
}
]
}
]
}
]
}
}
Я бы хотел отделить и вынуть full_document
в примере выше. Когда я использую get_json_object, я получаю full_document в другом потоковом фрейме данных как строку JSON, а не как объект. Как вы можете видеть, в full_document есть несколько типов массивов, которые я могу раскрыть (в документации сказано, что explode поддерживается в потоковом DF, но еще не пробовал), но есть также некоторые объекты (например, тип структуры), которые я хотел бы извлечь для отдельных лиц. поля. Я не могу использовать нотацию SQL '*', потому что get_json_object возвращает строку, а не сам объект.