Я реализую сценарий Python для извлечения существующих пользовательских данных из базы данных Google BigQuery, затем использую многопоточный подход, чтобы выполнить некоторые функциональные возможности веб-очистки для каждого пользователя и, наконец, сохранить результаты в другой таблице в BigQuery. Существует около 3,6 миллиона существующих пользовательских записей, и для каждого пользователя требуется максимум 40 секунд. Моя цель - иметь возможность обрабатывать 100 000 пользователей в день, поэтому мне нужен параллельный подход к обработке.
Я использую ThreadPoolExecutor
из модуля concurrent.futures
. После того, как определенное количество потоков завершит свою работу, исполнитель должен сохранить соответствующий пакет результатов обратно в BigQuery. Я вижу, что потоки продолжают выполнять свои функции очистки веб-страниц. Но через некоторое время (или с большим количеством потоков) они перестают хранить записи обратно в базу данных.
Сначала я думаю, что имел дело с некоторыми условиями гонки, связанными с очисткой партии результаты, но с тех пор я реализовал BoundedSemaphore
из модуля threading
для реализации подхода блокировки, который, как я считаю, решил исходную проблему. Но результаты все еще не надежно сохранены назад в базе данных. Так, может быть, я что-то пропустил?
Я мог бы воспользоваться помощью кого-то, у кого есть большой опыт работы с параллельной обработкой в Python. В частности, я запускаю сценарий на сервере Heroku, поэтому опыт работы с Heroku также может быть полезен. Спасибо!! Ниже приведен фрагмент моего кода:
service = BigQueryService() # a custom class defined elsewhere
users = service.fetch_remaining_users(min_id=MIN_ID, max_id=MAX_ID, limit=LIMIT) # gets users from BigQuery
print("FETCHED UNIVERSE OF", len(users), "USERS")
with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:
batch = []
lock = BoundedSemaphore()
futures = [executor.submit(user_with_friends, row) for row in users]
print("FUTURE RESULTS", len(futures))
for index, future in enumerate(as_completed(futures)):
#print(index)
result = future.result()
# OK, so this locking business:
# ... prevents random threads from clearing the batch, which was causing results to almost never get stored, and
# ... restricts a thread's ability to acquire access to the batch until another one has released it
lock.acquire()
batch.append(result)
if (len(batch) >= BATCH_SIZE) or (index + 1 >= len(futures)): # when batch is full or is last
print("-------------------------")
print(f"SAVING BATCH OF {len(batch)}...")
print("-------------------------")
service.append_user_friends(batch) # stores the results in another table on BigQuery
batch = []
lock.release()
См. Также:
https://docs.python.org/3/library/concurrent.futures.html#concurrent .futures.ThreadPoolExecutor
https://docs.python.org/3.7/library/threading.html#threading .BoundedSemaphore