Как избежать состояния гонки при использовании ThreadPoolExecutor - PullRequest
2 голосов
/ 04 февраля 2020

У меня есть следующий метод concurrent_api_call_and_processing (), который принимает следующие параметры:

  • api_call: это HTTP-запрос к внешнему веб-сайту, который извлекает, а документ XLM
  • lst: является список целых чисел (идентификаторов), необходимых для api_call
  • callback_processing: локальный метод, который просто анализирует каждый XLM-запрос

Я делаю около 500 HTTP-запросов, по одному для каждого идентификатора в lst, using api_call () Затем каждый ответ обрабатывается локальным методом callback_processing (), который анализирует XLM и возвращает кортеж

def concurrent_api_call_and_processing(api_call=None, callback_processing=None, lst=None, workers=5):
    """
    :param api_call: Function that will be called concurrently. An API call to API_Provider for each entry.
    : param lst: List of finding's ids needed by the API function to call API_Provider endpoint.
    :param callback_processing: Function that will be called after we get the response from the above  API call.
    : param workers: Number of concurrent threads that will be used.
    :return: array of tuples containing the details of each particular finding.
    """

    output = Queue()
    with ThreadPoolExecutor(max_workers=workers) as executor:
        future_to_f_detail = {executor.submit(api_call, id): id for id in lst}
        for future in as_completed(future_to_f_detail):
            try:
                find_details = future.result()
            except Exception as exc:
                print(f"Finding {id} generated and exception: {exc}")
            else:
                f_det = callback_processing(find_details)
                output.put(f_det)
    return output

Я начал замечать некоторые случайные проблемы (не изящное завершение) при использовании этого метода .

Поскольку я использовал массив вместо очереди (output=[]), но сомневался, что у меня может быть состояние гонки, я решил провести рефакторинг кода и начать использовать Queue (output=Queue)

Мой вопрос:

  • Является ли мой код, как сейчас, свободным от состояния гонки?

ПРИМЕЧАНИЕ: я хотел обратите внимание, что следуйте Рэймонд Хеттингер, Keynote on Concurrency, PyBay 2017 , я добавил fuzz() методы сна для тестирования, но не смог определить, действительно ли у меня было состояние гонки или нет.

Ответы [ 2 ]

1 голос
/ 04 февраля 2020

При указанных выше условиях в этом коде не будет условия гонки. В соответствии с документами concurrent.futures здесь происходит следующее:

  1. executor.submit (): возвращает объект Future, представляющий выполнение вызываемого объекта.
  2. as_completed (future_to_f_detail): возвращает итератор для экземпляров Future, предоставленных future_to_f_detail, который возвращает фьючерсы по мере их завершения (завершенные или отмененные фьючерсы).

Так что действительно для l oop потребляется итератор и возвращая одно за другим каждое будущее, которое является выходом, с помощью as_completed ()

Так что, если только call_back () или функция, которую мы вызвали, не представляют какую-то функциональность asyn c (как в примере, описанном @ dm03514 выше), мы просто работаем синхронно после for l oop

   counter = 0
   with ThreadPoolExecutor(max_workers=workers) as executor:
        future_to_f_detail = {executor.submit(api_call, id): id for id in lst}
        for future in as_completed(future_to_f_detail):
            print(f"Entering the for loop for {counter+1} time") 
            counter +=1
            try:
                find_details = future.result()
            except Exception as exc:
                print(f"Finding {id} generated and exception: {exc}")
            else:
                f_det = callback_processing(find_details)
                output.append(f_det)
    return output

Если у нас есть массив 500 идентификаторов, и мы делаем 500 вызовов, и все вызовы дают будущее, мы напечатаем сообщение в печати 500 время, один раз перед входом в попытку l oop.

Мы не обязаны использовать Очередь, чтобы избежать условия гонки в этом случае. как таковые. Futures создает отложенное исполнение, когда мы используем submit, мы получаем будущее, которое будет использовано позже

Некоторые важные замечания и рекомендации:

  1. Ramalho, Luciano, Fluent Python, глава 17 Параллелизм с будущим.
  2. Бизли, Дэвид: Python Поваренная книга Глава 12 Параллелизм. Страница 516 Определение и задание актера
1 голос
/ 04 февраля 2020

Я не думаю, что достаточно информации, чтобы определить это.

Рассмотрим, что произойдет, если вы передадите функцию api_call, которая увеличивает глобальную переменную:

count = 0
def api_call_fn():
  global count 
  count += 1

При одновременном выполнении она будет иметь переменную count, увеличивающую условие гонки.

То же самое относится и к функции callback_processing.


Чтобы проверить, свободен ли этот код от состояния гонки, мы должны увидеть определение обеих этих функций :)

...