Мне нужно повысить производительность задачи Airflow, которая передает данные из BigQuery в MongoDB. Соответствующая задача в моем DAG
использует PythonOperator
и просто вызывает следующую функцию python для передачи одной таблицы / коллекции:
def transfer_full_table(table_name):
start_time = time.time()
# (1) Connect to BigQuery + Mongo DB
bq = bigquery.Client()
cluster = MongoClient(MONGO_URI)
db = cluster["dbname"]
print(f'(1) Connected to BQ + Mongo: {round(time.time() - start_time, 5)}')
# (2)-(3) Run the BQ Queries
full_query = f"select * from `gcpprojectid.models.{table_name}`"
results1 = bq.query(full_query)
print(f'(2) Queried BigQuery: {round(time.time() - start_time, 5)}')
results = results1.to_dataframe()
print(f'(3) Converted to Pandas DF: {round(time.time() - start_time, 5)}')
# (4) Handle Missing DateTimes # Can we refactor this into its own function?
datetime_cols = [key for key in dict(results.dtypes) if is_datetime(results[key])]
for col in datetime_cols:
results[[col]] = results[[col]].astype(object).where(results[[col]].notnull(), None)
print(f'(4) Resolved Datetime Issue: {round(time.time() - start_time, 5)}')
# (5) And Insert Properly Into Mongo
db[table_name].drop()
db[table_name].insert_many(results.to_dict('records'))
print(f'(5) Wrote to Mongo: {round(time.time() - start_time, 5)}')
DAG настроен для передачи множества таблиц из BigQuery. в MongoDB (одна передача для каждой задачи), и эта конкретная функция transfer_full_table
предназначена для передачи всей отдельной таблицы, поэтому она просто:
- запрашивает всю таблицу BQ
- конвертирует до pandas, исправляет проблему типа
- отбрасывает предыдущую коллекцию MongoDB и повторно вставляет
Я пытаюсь использовать эту функцию для таблицы размером 60 МБ, и вот производительность различных частей задания:
(1) Connected to BQ + Mongo: 0.0786
(2) Queried BigQuery: 0.80595
(3) Converted to Pandas DF: 87.2797
(4) Resolved Datetime Issue: 88.33461
(5) Wrote to Mongo: 213.92398
Шаги 3 и 5 занимают все время. Задача очень быстро подключается к BQ и Mon go (1), и BQ может очень быстро запросить эту таблицу на 60 МБ (2). Однако, когда я конвертирую в фрейм данных pandas (3) (необходимый для (4) для решения возникшей у меня проблемы type
), этот шаг занимает ~ 86,5 секунды. Решение проблемы даты и времени происходит очень быстро (4), однако в конце удаление предыдущей коллекции MongoDB и повторная вставка нового кадра данных pandas в MongoDB (5) занимает (213,9 - 88,3) = ~ 125 секунд. .
Будем очень признательны за любые советы, будь то на стороне Pandas или MongoDB, как я могу оптимизировать эти два узких места!