Python 3.7 Стратегия многопоточности - PullRequest
0 голосов
/ 18 февраля 2020

У меня есть рабочая нагрузка, которая состоит из очень медленного запроса, который возвращает ОГРОМНОЕ количество данных, которые необходимо проанализировать и рассчитать, и все это на все oop. По сути, это выглядит следующим образом:

for x in lastTenYears
    myData = DownloadData(x)             # takes about   ~40-50 [sec]
    parsedData.append(ParseData(myData)) # takes another +30-60 [sec]

Как я полагаю, вы заметили, что если бы я мог запустить анализ данных в потоке, я мог бы загрузить следующий пакет данных, пока выполняется анализ.

Как мне добиться такого параллелизма операций?

В идеале я хотел бы, чтобы 1 поток всегда загружался, а N потоков выполнял синтаксический анализ. Часть загрузки на самом деле является запросом к базе данных, поэтому нехорошо иметь кучу параллельных из них ...

Подробности:
Синтаксический анализ данных сильно привязан к процессору , а
состоит из необработанных математических вычислений и ничего более.

Использование Python 3.7.4

Ответы [ 4 ]

1 голос
/ 18 февраля 2020

1) Использовать потокобезопасную очередь. Queue.FIFOQueue . На верхнем уровне определите

my_queue = Queue.FIFOQueue()
parsedData = []

2) В первом потоке начните загрузку данных

my_queue.put(DownloadData(x))

Во втором потоке

if not (my_queue.empty()):
    myData = my_queue.get()
    parsedData.append(ParseData(myData))
1 голос
/ 18 февраля 2020

Если ваша программа связана с процессором, вам будет сложно делать что-то еще в других потоках из-за GIL (глобальной блокировки интерпретатора).

Вот ссылка на статью, которая может помочь вам понять topi c: https://opensource.com/article/17/4/grok-gil

Загрузка данных в подпроцесс, скорее всего, лучший подход.

1 голос
/ 18 февраля 2020

Трудно сказать, поможет ли и насколько это реально (поскольку мне нечего проверять ...), но вы можете попробовать multiprocessing.Pool. Он обрабатывает всю грязную работу за вас, и вы можете настроить количество процессов, размер куска и т. Д. c.

from multiprocessing import Pool

def worker(x):
    myData = DownloadData(x)
    return ParseData(myData)

if __name__ == "__main__":
    processes = None  # defaults to os.cpu_count()
    chunksize = 1

    with Pool(processes) as pool:
        parsedData = pool.map(worker, lastTenYears, chunksize)

Здесь для примера я использую метод map, но в зависимости от ваших потребностей вы можете использовать imap или map_async.

0 голосов
/ 19 февраля 2020

Q : Как мне достичь этого параллелизма операций?

Шаг номер один состоит в Следует понимать, что запрошенный выше вариант использования не является [PARALLEL] выполнением кода, а неупорядоченным пакетом ограниченного выполнения политики использования ресурсов строгой последовательности пар:

First -a- remote-[DB-Query] (возвращение (цит.) ОГРОМНОЕ количество данных )
Далее -a- local-[CPU-process] (из (cit.) ОГРОМНОЕ количество данных только что вернулось сюда)

Задержка первого может быть в маске
(если это было разрешено, , но не разрешено - из-за желания не перегружать DB-хост) ,
задержка для второго не
(может запускаться, но следующий связанный с вводом / выводом запрос DB, но только если не нарушает правило хранения DB-машины, но под небольшой рабочей нагрузкой) .

Как я полагаю, вы заметили , если бы я мог выполнить анализ данных в потоке, я мог бы загрузить следующую партию данных, пока выполняется синтаксический анализ.

Это самое время до Проясните ситуацию:

Факты:

A )
Задачи, связанные с процессором никогда не будут выполнены работать быстрее в любом количестве N потоков в python -GIL-lock-управляемой экосистеме (с тех пор и всегда, как выразился Гвидо Россум) ,
как GIL-блокировка обеспечивает повторную [SERIAL] -изацию, поэтому чем больше потоков «работает», тем больше потоков фактически ожидает получения GIL-блокировки, прежде чем они «получат» его, но для 1 / ( N + 1 ) -ая доля времени полученного, благодаря GIL-замку, снова чистая - [SERIAL], длительность N * ( 30 - 60 ) [sec]

B )
Задача, связанная с вводом / выводом, не имеет смысла выгружать в полное параллельное выполнение на основе процесса, как полную копию процесса python (в Windows также с дублированием полное состояние интерпретатора python со всеми данными во время создания экземпляра подпроцесса) не имеет смысла, поскольку существуют более разумные методы для обработки, связанной с вводом-выводом (где GIL-блокировка не причиняет столько вреда.

C)
Вся концепция N-parsing : 1-querying принципиально неверна - максимально достижимая цель - замаскировать задержку процесса ввода-вывода (где создание в смысле), но здесь каждый запрос занимает упомянутых ~ 40-50 [sec], поэтому второй пакет данных для анализа здесь никогда не будет присутствовать до запуска упомянутых ~ 40-50 [sec] в следующий раз, поэтому
ни один второй работник никогда не сможет что-либо проанализировать в любое время до T0 + ~ 80~100 [sec] - так что можно мечтать о том, чтобы sh N - (несвязанные) -работники, работающие (но имеющие их, но фактически ожидающие данные), возможны, но ужасно непродуктивно (хуже для N- (GIL-MUTEX-ed) - "ожидающих" -агентов).

...