Python concurrent.futures - TypeError: аргумент zip # 1 должен поддерживать итерацию - PullRequest
0 голосов
/ 30 сентября 2019

Я хочу обрабатывать документы mongodb в пакете 1000 с использованием многопроцессорной обработки. Однако приведенный ниже фрагмент кода дает TypeError: zip argument #1 must support iteration

Код:

def documents_processing(skip):
    conn = get_connection()
    db = conn["db_name"]

    print("Process::{} -- db.Transactions.find(no_cursor_timeout=True).skip({}).limit(10000)".format(os.getpid(), skip))
    documents = db.Transactions.find(no_cursor_timeout=True).skip(skip).limit(10000)
    # Do some processing in mongodb


max_workers = 20


def skip_list():
    for i in range(0, 100000, 10000):
        yield [j for j in range(i, i + 10000, 1000)]


def main_f():
    try:
        with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
            executor.map(documents_processing, skip_list)
    except Exception:
        print("exception:", traceback.format_exc())

main_f()

Ошибка трассировки:

(rpc_venv) [user@localhost ver2_mt]$ python main_mongo_v3.py 
exception: Traceback (most recent call last):
  File "main_mongo_v3.py", line 113, in main_f
    executor.map(documents_processing, skip_list)
  File "/usr/lib64/python3.6/concurrent/futures/process.py", line 496, in map
    timeout=timeout)
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 575, in map
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 575, in <listcomp>
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
  File "/usr/lib64/python3.6/concurrent/futures/process.py", line 137, in _get_chunks
    it = zip(*iterables)
TypeError: zip argument #1 must support iteration

Как исправить эту ошибку? Благодарю.

1 Ответ

1 голос
/ 30 сентября 2019

Вызвать функцию skip_list для возврата генератора.

В настоящее время вы передаете функцию в качестве второго аргумента, а не итерацию.

executor.map(documents_processing, skip_list())

Поскольку выизвлекая 10 тыс. документов в каждом процессе, начиная с n , вы можете объявить skip_list как:

def skip_list():
    for i in range(0, 100000, 10000):
        yield i
...