Запишите результаты Google Api в озеро данных с помощью Databricks - PullRequest
0 голосов
/ 11 апреля 2019

Я получаю данные об использовании пользователя от API администратора отчетов об использовании пользователей через Python SDK для Databricks. Размер данных составляет около 100 000 записей в день, которые я делаю ночью через пакетный процесс. API возвращает максимальный размер страницы 1000, поэтому я грубо называю его 1000, чтобы получить данные, которые мне нужны за день. Это работает нормально.

Моя конечная цель - сохранить данные в необработанном формате в озере данных (Azure Gen2, но не имеет отношения к этому вопросу). Позже я преобразую данные с помощью Databricks в модель агрегированной отчетности и добавлю к ним PowerBI для отслеживания использования Google App с течением времени.

Как программист C #, я новичок в Python и Spark: мой текущий подход заключается в том, чтобы запросить первую страницу из 1000 записей из API, а затем записать ее в набор данных непосредственно в виде файла JSON, затем получить следующий набор страниц и напиши это тоже. Структура папок будет выглядеть примерно так: \ raw \ googleuser \ YYYY \ MM \ DD \ data1.json.

Я бы хотел сохранить данные в их наиболее сырой форме в необработанной зоне и не применять слишком много преобразований. 2-й процесс может извлечь нужные мне поля, пометить их метаданными и записать обратно как Parquet, готовый к использованию по функциям. Вот почему я думаю написать его в формате JSON.

Это означает, что 2-й процесс должен прочитать JSON в фрейм данных, где я могу преобразовать его и записать как паркет (эта часть также проста).

Поскольку я использую Google Api, я не работаю с Json - он возвращает объекты dict (со сложным вложением). Я могу извлечь его в виде строки Json, используя json.dump (), но не могу понять, как записать STRING непосредственно в мой набор данных. Как только я помещаю его в фрейм данных, я могу легко записать его в любом формате, однако для преобразования его из Json в фрейм данных и, по сути, обратно в Json просто написать его.

Вот что я попробовал и результаты:

  1. Создайте список pyspark.sql.Rows и в конце всех страниц (100k строк) - используйте spark.createDataFrame (lines), чтобы превратить его в фрейм данных. Как только это будет фрейм данных, я могу сохранить его как файл Json. Это работает, но кажется неэффективным.
  2. Используйте json.dump (запрос), чтобы получить строку из 1000 записей в Json. Я могу записать его в файловую систему Databricks, используя этот код:

    with open("/dbfs/tmp/googleuserusagejsonoutput-{0}.json" .format(keyDateFilter), 'w') as f: f.write(json.dumps(response))

    Однако мне нужно переместить его в озеро данных Azure с помощью:

    dbutils.fs.cp("/tmp/test_dbfs1.txt", datalake_path + dbfs_path + "xyz.json")

    Тогда я получаю следующие 1000 записей и продолжаю делать это. Кажется, я не могу использовать каталог метода open () для хранилища озера данных (драйвер abfss Azure), или это было бы достойным решением. Это кажется хрупким и странным - сначала сбросить его локально, а затем переместить.

  3. То же, что и в варианте 1, но выдает дамп кадра данных для передачи данных каждые 1000 записей и перезаписывает его (чтобы объем памяти не увеличивался более чем на 1000 записей одновременно)

  4. Игнорировать правило сброса сырого Json. Массаж данных в самый простой формат, который я хочу, и избавиться от всех дополнительных данных, которые мне не нужны. Это приведет к гораздо меньшей занимаемой площади, и тогда будет использован вариант 1 или 3 выше. (Это второй вопрос - принцип сохранения всех данных из API в его необработанном формате, так что, поскольку требования со временем меняются, у меня всегда есть исторические данные в озере данных, и я могу просто изменить подпрограммы преобразования для извлечения различных метрик из это. Следовательно, я не хочу отбрасывать любые данные на этом этапе.

Любой совет приветствуется, пожалуйста ...

1 Ответ

1 голос
/ 11 апреля 2019

Смонтируйте озеро в вашей среде блоков данных, чтобы вы могли просто сохранить его на озеро, как если бы это была обычная папка:

with open('/dbfs/mnt/mydatalake/googleuserusagejsonoutput-{0}.json', 'wb') as f:
            json.dump(data, codecs.getwriter('utf-8')(f), sort_keys = True, indent = 4, ensure_ascii=False)
            f.close()

Вам необходимо монтировать озеро только один раз:

https://docs.databricks.com/spark/latest/data-sources/azure/azure-datalake-gen2.html#mount-the-azure-data-lake-storage-gen2-filesystem-with-dbfs

При этом

Хранение больших данных в формате json не является оптимальным;для каждого значения (ячейки) вы храните ключ (имя столбца), поэтому ваши данные будут намного больше, чем нужно.Кроме того, у вас, вероятно, должна быть функция дедупликации, чтобы гарантировать, что (1) нет пробелов в данных, и (2) вы не храните одни и те же данные в нескольких файлах.Об этом позаботится Дельта Кирпичей данных.

https://docs.databricks.com/delta/delta-intro.html

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...