Вы на правильном пути, пытаясь решить свою проблему с помощью асинхронного программирования.Асинхронное программирование в Python само по себе сложно, поскольку есть принципиальные различия в параллелизме, реализованном с использованием потоков (threading
), процессов (multiprocessing
) или сопрограмм (asyncio
).Не существует «правильного» подхода, вы выбираете метод, который наилучшим образом соответствует текущему варианту использования.
В вашей задаче есть задачи, связанные с IO (выборка и запись данных), и задачи с CPU (обработка данных), которыеможет работать независимо друг от друга параллельно.Вот как вы можете это сделать.Возможно, это не самое элегантное решение, но оно покажет вам, как вы можете решать такие проблемы.
В нашем решении мы будем использовать потоки для задач, связанных с вводом-выводом, и процессы для задач, связанных с процессором.Лично я предпочел бы использовать потоки для всех задач, но в этом случае мы не сможем раскрыть всю мощь современных многоядерных процессоров для распараллеливания обработки данных из-за GIL .
Сначала давайте импортируем необходимые модули в наш исполняемый скрипт:
import time
import random
import signal
from threading import Thread
from multiprocessing.pool import Pool
from queue import Queue, Empty
Решаемая нами проблема - это проблема производитель-потребитель .Основной поток извлекает данные через фиксированные промежутки времени и помещает их в очередь.Поток процессора получает данные из очереди и передает их в пул рабочих для обработки, затем собирает результаты и помещает их в другую очередь.Эта очередь постоянно читается записывающим потоком, который, наконец, сохраняет данные.Теперь мы добавим несколько констант - количество рабочих процессов, выполняемых параллельно, и интервал выборки данных в секундах:
WORKERS = 4
FETCH_INTERVAL = 1
Ниже приведен основной поток, который отвечает за выборку данных каждые FETCH_INTERVAL
сек.бесконечный цикл:
def main():
raw_data = Queue()
processor = Thread(target=process, args=(raw_data,))
processor.start()
i = 0
try:
while True:
t_fetch = time.time()
# Simulate the data fetching:
time.sleep(0.5)
data = i, random.random()
print("[main] Fetched raw data:", data)
raw_data.put(data)
t_elapsed = time.time() - t_fetch
if t_elapsed < FETCH_INTERVAL:
time.sleep(FETCH_INTERVAL - t_elapsed)
else:
print("[error] The fetch interval is too short!")
i = i + 1
except KeyboardInterrupt:
print("shutting down...")
finally:
raw_data.put(None)
processor.join()
if __name__ == "__main__":
main()
Мы начинаем с определения очереди raw_data
, в которой будут храниться извлеченные данные, и запуска потока processor
, который запускает функцию process
, которая принимает raw_data
очередь в видеего аргумент.Обратите внимание, что мы не просто спим FETCH_INTERVAL
секунд после каждой выборки данных, но мы учитываем задержки, вызванные выборкой данных, поскольку это также задача, связанная с вводом-выводом.Сценарий выполняется до тех пор, пока не будет нажата Ctrl-C
.После прерывания мы помещаем None
в очередь, чтобы сообщить потокам, что обработка завершена, и ожидаем завершения потока processor
.Теперь мы добавляем определение функции process
, которая выполняется потоком processor
:
def process(raw_data):
proc_data = Queue()
writer = Thread(target=write, args=(proc_data,))
writer.start()
with Pool(WORKERS, init_worker) as pool:
while True:
data_batch = dequeue_data(raw_data, batch_size=WORKERS)
if not data_batch:
time.sleep(0.5)
continue
results = pool.map(process_data, data_batch)
print("[processor] Processed raw data:", results)
for r in results:
proc_data.put(r)
if None in data_batch:
break
print("joining the writer thread...")
writer.join()
Здесь мы создаем очередь proc_data
, в которой будут храниться результаты обработки данных для writer
нить.Поток writer
запускает функцию write
, которую мы определим позже.После запуска потока writer
мы создаем pool
из WORKERS
процессов.Здесь мы используем функцию init_worker
в качестве Pool
инициализатора процесса, чтобы игнорировать клавиатурные прерывания в рабочих процессах при их обработке в главном потоке:
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
После создания пула процессов мы входим вбесконечный цикл, постоянно выводящий пакеты данных из очереди raw_data
путем вызова функции dequeue_data
, которую мы определим ниже.Пакеты данных затем передаются в рабочий пул для обработки.Функция process_data
будет определена ниже.Затем мы собираем результаты и помещаем их в очередь proc_data
, которая читается потоком writer
.Если в пакете данных есть None
, обработка прерывается, и мы ожидаем завершения потока writer
.Функция dequeue_data
определяется следующим образом:
def dequeue_data(data_queue, batch_size):
items = []
for _ in range(batch_size):
try:
item = data_queue.get(block=False)
except (KeyboardInterrupt, Empty):
break
items.append(item)
return items
Здесь вы видите, что она просто пытается извлечь и вернуть максимум batch_size
точек данных из data_queue
.Если данных нет, возвращается пустой список.Функция process_data
ничего не делает, но спит 1-5 секунд:
def process_data(data):
if data is None:
return
# Simulate the data processing:
time.sleep(random.randint(1, 5))
return data
Наконец, мы определяем функцию write
, которая выполняется в потоке writer
:
def write(proc_data):
while True:
data = proc_data.get()
if data is None:
break
# Simulate the data writing:
time.sleep(random.randint(1, 2))
print("[writer] Wrote processed data:", data)
Бесконечный цикл останавливается, как только он получает None
из очереди proc_data
.Теперь мы сохраняем весь предоставленный код в одном скрипте, затем запускаем и проверяем его результаты:
[main] Fetched raw data: (0, 0.8092310624924178)
[main] Fetched raw data: (1, 0.8594148294409398)
[main] Fetched raw data: (2, 0.9059856675215566)
[main] Fetched raw data: (3, 0.5653361157057876)
[main] Fetched raw data: (4, 0.8966396309003691)
[main] Fetched raw data: (5, 0.5772344067614918)
[processor] Processed raw data: [(0, 0.8092310624924178)]
[main] Fetched raw data: (6, 0.4614411399877961)
^Cshutting down...
[writer] Wrote processed data: (0, 0.8092310624924178)
[processor] Processed raw data: [(1, 0.8594148294409398), (2, 0.9059856675215566), (3, 0.5653361157057876), (4, 0.8966396309003691)]
[writer] Wrote processed data: (1, 0.8594148294409398)
[writer] Wrote processed data: (2, 0.9059856675215566)
[processor] Processed raw data: [(5, 0.5772344067614918), (6, 0.4614411399877961), None]
joining the writer thread...
[writer] Wrote processed data: (3, 0.5653361157057876)
[writer] Wrote processed data: (4, 0.8966396309003691)
[writer] Wrote processed data: (5, 0.5772344067614918)
[writer] Wrote processed data: (6, 0.4614411399877961)
Поток main
извлекает данные через фиксированные промежутки времени, в то время как processor
обрабатывает данные в пакетах параллельно, а writer
сохраняет результаты. Как только мы нажали Ctrl-C
, поток main
прекратил извлечение данных, затем поток processor
завершил обработку остальных извлеченных данных и начал ждать, пока поток writer
завершит запись данных на диск.