Задача оптимизации воздушного потока, которая передает данные из BigQuery в MongoDB - PullRequest
0 голосов
/ 29 мая 2020

Мне нужно повысить производительность задачи Airflow, которая передает данные из BigQuery в MongoDB. Соответствующая задача в моем DAG использует PythonOperator и просто вызывает следующую функцию python для передачи одной таблицы / коллекции:

def transfer_full_table(table_name):
    start_time = time.time()

    # (1) Connect to BigQuery + Mongo DB
    bq = bigquery.Client()
    cluster = MongoClient(MONGO_URI)
    db = cluster["dbname"]
    print(f'(1) Connected to BQ + Mongo: {round(time.time() - start_time, 5)}')

    # (2)-(3) Run the BQ Queries
    full_query = f"select * from `gcpprojectid.models.{table_name}`"
    results1 = bq.query(full_query)
    print(f'(2) Queried BigQuery: {round(time.time() - start_time, 5)}')
    results = results1.to_dataframe()
    print(f'(3) Converted to Pandas DF: {round(time.time() - start_time, 5)}')

    # (4) Handle Missing DateTimes   # Can we refactor this into its own function?
    datetime_cols = [key for key in dict(results.dtypes) if is_datetime(results[key])]
    for col in datetime_cols:
        results[[col]] = results[[col]].astype(object).where(results[[col]].notnull(), None)
    print(f'(4) Resolved Datetime Issue: {round(time.time() - start_time, 5)}')

    # (5) And Insert Properly Into Mongo
    db[table_name].drop()
    db[table_name].insert_many(results.to_dict('records'))
    print(f'(5) Wrote to Mongo: {round(time.time() - start_time, 5)}')

DAG настроен для передачи множества таблиц из BigQuery. в MongoDB (одна передача для каждой задачи), и эта конкретная функция transfer_full_table предназначена для передачи всей отдельной таблицы, поэтому она просто:

  • запрашивает всю таблицу BQ
  • конвертирует до pandas, исправляет проблему типа
  • отбрасывает предыдущую коллекцию MongoDB и повторно вставляет

Я пытаюсь использовать эту функцию для таблицы размером 60 МБ, и вот производительность различных частей задания:

(1) Connected to BQ + Mongo: 0.0786
(2) Queried BigQuery: 0.80595
(3) Converted to Pandas DF: 87.2797
(4) Resolved Datetime Issue: 88.33461
(5) Wrote to Mongo: 213.92398

Шаги 3 и 5 занимают все время. Задача очень быстро подключается к BQ и Mon go (1), и BQ может очень быстро запросить эту таблицу на 60 МБ (2). Однако, когда я конвертирую в фрейм данных pandas (3) (необходимый для (4) для решения возникшей у меня проблемы type), этот шаг занимает ~ 86,5 секунды. Решение проблемы даты и времени происходит очень быстро (4), однако в конце удаление предыдущей коллекции MongoDB и повторная вставка нового кадра данных pandas в MongoDB (5) занимает (213,9 - 88,3) = ~ 125 секунд. .

Будем очень признательны за любые советы, будь то на стороне Pandas или MongoDB, как я могу оптимизировать эти два узких места!

1 Ответ

1 голос
/ 30 мая 2020

Короткий ответ заключается в том, что асинхронные операции портят ваше профилирование.

В документации на bq.query указано, что полученный объект google.cloud.bigquery.job.QueryJob является асинхронным запросить задание. Это означает, что после отправки запроса интерпретатор python не блокируется, пока вы не попытаетесь использовать результаты запроса с одним из синхронных методов QueryJob, to_dataframe(). Значительная часть наблюдаемых 87 секунд, вероятно, просто тратится на ожидание возврата запроса.

Вы можете дождаться завершения запроса, вызывая QueryJob.done итеративно, пока он не вернет true, а затем вызовите ваш второй оператор печати профилирования.

Это не совсем оптимизация вашего кода, но, надеюсь, поможет двигаться в правильном направлении. Возможно, вам может помочь некоторая настройка pandas туда и обратно, но я думаю, что большая часть вашего времени тратится на ожидание чтения / записи из ваших баз данных, и что более эффективная запись или большее количество небольших запросов будет быть вашим единственным вариантом сократить общее время.

...