итераторы Python Zip параллельно с использованием потоков - PullRequest
0 голосов
/ 16 сентября 2018

Скажем, у меня есть N генераторов, которые производят поток предметов gs = [..] # list of generators.

Я могу легко zip собрать их вместе, чтобы получить генератор кортежей от каждого соответствующего генератора в gs: tuple_gen = zip(*gs).

Это вызывает next(g) для каждого g в последовательности в gs и собирает результаты в кортеж.Но если каждый элемент стоит дорого, мы можем распараллелить работу next(g) в нескольких потоках.

Как я могу реализовать pzip(..), который это делает?

1 Ответ

0 голосов
/ 18 сентября 2018

То, что вы просили, может быть достигнуто путем создания генератора, который выдает результаты из apply_async -колоколов на ThreadPool.

К вашему сведению, я сравнил этот подход с pandas.read_csv -тераторами, которые вы получаете, указавпараметр chunksize.Я создал восемь копий csv-файла размером 1М и указал chunksize = 100_000.

Четыре файла были прочитаны с помощью предоставленного вами последовательного метода, четыре с функцией mt_gen ниже, используя пулчетыре потока:

  • однопоточный ~ 3,68 с
  • многопоточный ~ 1,21 с

Не означает, что это будетулучшайте результаты для каждого оборудования и настроек данных.

import time
import threading
from multiprocessing.dummy import Pool  # dummy uses threads


def _load_sim(x = 10e6):
    for _ in range(int(x)):
        x -= 1
    time.sleep(1)


def gen(start, stop):
    for i in range(start, stop):
        _load_sim()
        print(f'{threading.current_thread().name} yielding {i}')
        yield i


def multi_threaded(gens):
    combi_g = mt_gen(gens)
    for item in combi_g:
        print(item)


def mt_gen(gens):
    with Pool(N_WORKERS) as pool:
        while True:
            async_results = [pool.apply_async(next, args=(g,)) for g in gens]
            try:
                results = [r.get() for r in async_results]
            except StopIteration:  # needed for Python 3.7+, PEP 479, bpo-32670
                return
            yield results


if __name__ == '__main__':

    N_GENS = 10
    N_WORKERS = 4
    GEN_LENGTH = 3

    gens = [gen(x * GEN_LENGTH, (x + 1) * GEN_LENGTH) for x in range(N_GENS)]
    multi_threaded(gens)

Вывод:

Thread-1 yielding 0
Thread-2 yielding 3
Thread-4 yielding 6
Thread-3 yielding 9
Thread-1 yielding 12
Thread-2 yielding 15
Thread-4 yielding 18
Thread-3 yielding 21
Thread-1 yielding 24
Thread-2 yielding 27
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
Thread-3 yielding 7
Thread-1 yielding 10
Thread-2 yielding 4
Thread-4 yielding 1
Thread-3 yielding 13
Thread-1 yielding 16
Thread-4 yielding 22
Thread-2 yielding 19
Thread-3 yielding 25
Thread-1 yielding 28
[1, 4, 7, 10, 13, 16, 19, 22, 25, 28]
Thread-1 yielding 8
Thread-4 yielding 2
Thread-3 yielding 11
Thread-2 yielding 5
Thread-1 yielding 14
Thread-4 yielding 17
Thread-3 yielding 20
Thread-2 yielding 23
Thread-1 yielding 26
Thread-4 yielding 29
[2, 5, 8, 11, 14, 17, 20, 23, 26, 29]

Process finished with exit code 0
...