Как поделиться состоянием при использовании одновременных фьючерсов - PullRequest
1 голос
/ 06 мая 2019

Мне известно, что используя традиционную многопроцессорную библиотеку, я могу объявить значение и разделить состояние между процессами.

https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing#sharing-state-between-processes

При использовании более новой библиотеки concurrent.futures как я могу поделиться состоянием между моими процессами?

import concurrent.futures

def get_user_object(batch):
    # do some work
    counter = counter + 1
    print(counter)

def do_multithreading(batches):
    with concurrent.futures.ThreadPoolExecutor(max_workers=25) as executor:
        threadingResult = executor.map(get_user_object, batches)

def run():
    data_pools = get_data()
    start = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=PROCESSES) as executor:
        processResult = executor.map(do_multithreading, data_pools)
    end = time.time()
    print("TIME TAKEN:", end - start)

if __name__ == '__main__':
    run()

Я хочу сохранить синхронизированное значение этого счетчика.

В предыдущей библиотеке я мог использовать multiprocessing.Value и Lock.

1 Ответ

2 голосов
/ 06 мая 2019

Вы можете передать initializer и initargs на ProcessPoolExecutor так же, как на multiprocessing.Pool.Вот пример:

import concurrent.futures
import multiprocessing as mp


def get_user_object(batch):
    with _COUNTER.get_lock():
        _COUNTER.value += 1
        print(_COUNTER.value, end=' ')


def init_globals(counter):
    global _COUNTER
    _COUNTER = counter


def main():
    counter = mp.Value('i', 0)
    with concurrent.futures.ProcessPoolExecutor(
        initializer=init_globals, initargs=(counter,)
    ) as executor:
        for _ in executor.map(get_user_object, range(10)):
            pass
    print()


if __name__ == "__main__":
    import sys
    sys.exit(main())

Использование:

$ python3 glob_counter.py 
1 2 4 3 5 6 7 8 10 9 

Где:

  • for _ in executor.map(get_user_object, range(10)): позволяет перебирать каждый результат ,В этом случае get_user_object() возвращает None, поэтому вам не нужно ничего обрабатывать;вы просто pass и не предпринимаете никаких дальнейших действий.
  • Последний вызов print() дает вам дополнительный символ новой строки, поскольку исходный вызов print() не использует символ новой строки (end=' ' ')
...