Проблема хранения данных в BigQuery с использованием многопоточного подхода в Python - PullRequest
0 голосов
/ 09 апреля 2020

Я реализую сценарий Python для извлечения существующих пользовательских данных из базы данных Google BigQuery, затем использую многопоточный подход, чтобы выполнить некоторые функциональные возможности веб-очистки для каждого пользователя и, наконец, сохранить результаты в другой таблице в BigQuery. Существует около 3,6 миллиона существующих пользовательских записей, и для каждого пользователя требуется максимум 40 секунд. Моя цель - иметь возможность обрабатывать 100 000 пользователей в день, поэтому мне нужен параллельный подход к обработке.

Я использую ThreadPoolExecutor из модуля concurrent.futures. После того, как определенное количество потоков завершит свою работу, исполнитель должен сохранить соответствующий пакет результатов обратно в BigQuery. Я вижу, что потоки продолжают выполнять свои функции очистки веб-страниц. Но через некоторое время (или с большим количеством потоков) они перестают хранить записи обратно в базу данных.

Сначала я думаю, что имел дело с некоторыми условиями гонки, связанными с очисткой партии результаты, но с тех пор я реализовал BoundedSemaphore из модуля threading для реализации подхода блокировки, который, как я считаю, решил исходную проблему. Но результаты все еще не надежно сохранены назад в базе данных. Так, может быть, я что-то пропустил?

Я мог бы воспользоваться помощью кого-то, у кого есть большой опыт работы с параллельной обработкой в ​​Python. В частности, я запускаю сценарий на сервере Heroku, поэтому опыт работы с Heroku также может быть полезен. Спасибо!! Ниже приведен фрагмент моего кода:

service = BigQueryService() # a custom class defined elsewhere

users = service.fetch_remaining_users(min_id=MIN_ID, max_id=MAX_ID, limit=LIMIT) # gets users from BigQuery
print("FETCHED UNIVERSE OF", len(users), "USERS")

with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:
    batch = []
    lock = BoundedSemaphore()
    futures = [executor.submit(user_with_friends, row) for row in users]
    print("FUTURE RESULTS", len(futures))
    for index, future in enumerate(as_completed(futures)):
        #print(index)
        result = future.result()

        # OK, so this locking business:
        # ... prevents random threads from clearing the batch, which was causing results to almost never get stored, and
        # ... restricts a thread's ability to acquire access to the batch until another one has released it
        lock.acquire()
        batch.append(result)
        if (len(batch) >= BATCH_SIZE) or (index + 1 >= len(futures)): # when batch is full or is last
            print("-------------------------")
            print(f"SAVING BATCH OF {len(batch)}...")
            print("-------------------------")
            service.append_user_friends(batch) # stores the results in another table on BigQuery
            batch = []
        lock.release()

См. Также:

https://docs.python.org/3/library/concurrent.futures.html#concurrent .futures.ThreadPoolExecutor

https://docs.python.org/3.7/library/threading.html#threading .BoundedSemaphore

1 Ответ

0 голосов
/ 11 апреля 2020

Итак, я использовал другой подход (см. Ниже), который работает более надежно. Старый подход координировался между потоками для хранения результатов, в то время как новый подход обрабатывает и сохраняет партию для потока.

Когда я значительно увеличиваю количество потоков до числа, подобного 2500, скрипт практически не прекращает сохранять результаты (такое поведение я бы хотел еще изучить), но я могу запустить это при относительно низком количестве потоков, и это делает работу.

...