Ответ 1 (карта на несколько функций)
Вы технически правы. В случае map, map_asyn c и других вариантов вам следует использовать одну функцию.
Но это ограничение можно обойти, реализовав исполнителя и передав функцию для выполнения в составе параметров:
def dispatcher(args):
return args[0](*args[1:])
Итак, минимальный рабочий пример:
import multiprocessing as mp
def function_1(v):
print("hi %s"%v)
return 1
def function_2(v):
print("by %s"%v)
return 2
def dispatcher(args):
return args[0](*args[1:])
with mp.Pool(2) as p:
tasks = [
(function_1, "A"),
(function_2, "B")
]
r = p.map_async(dispatcher, tasks)
r.wait()
results = r.get()
Ответ 2 (планирование)
Я бы удалил время из сценария и запланировал задание cron ( вкл. GNU / Linux) ( на windows), так что ОС будет отвечать за его выполнение.
На Linux вы можете запустить cronotab -e
и добавить следующая строка, чтобы скрипт запускался каждые 5 минут.
*/5 * * * * python /path/to/script.py
Ответ 3 (Общий словарь)
да, но нет.
Насколько мне известно, использование Manager для Данные, такие как коллекции, это лучший способ. Для массивов или примитивных типов (int, float, e cc) существует Value
и Array
, которые быстрее .
Как в документации
Объект менеджера, возвращаемый Manager (), управляет процессом сервера, который содержит> 1073 * объектов и позволяет другим процессам манипулировать ими с помощью прокси.
Менеджер, возвращаемый Manager () будет поддерживать список типов, dict, Namespace, Lock,> RLock, семафор, BoundedSemaphore, условие, событие, барьер, очередь, значение и> массив.
Диспетчеры процессов сервера более гибки, чем объекты с общей памятью, поскольку их можно настроить для поддержки произвольных типов объектов. Также один менеджер может совместно использоваться процессами на разных компьютерах по сети. Однако они работают медленнее, чем использование общей памяти.
Но вам нужно только вернуть Dataframe, поэтому общий словарь ему не нужен.
Очищено Код
Используя все предыдущие идеи, код можно переписать в следующем виде:
версия карты
import numpy as np
import pandas as pd
from time import sleep
import multiprocessing as mp
def worker1(t , seed):
print('worker1 is here.')
sleep(t)
np.random.seed(seed)
return pd.DataFrame(np.random.randint(0,1000,8).reshape(2,4), columns=list('ABCD'))
def worker2(t , seed):
print('worker2 is here.')
sleep(t)
np.random.seed(seed)
return pd.DataFrame(np.random.randint(0, 1000, 12).reshape(3, 4), columns=list('ABCD'))
def dispatcher(args):
return args[0](*args[1:])
def task_generator(sleep_time=1):
seed = np.random.randint(0,1000,1)
yield worker1, sleep_time, seed
yield worker2, sleep_time, seed + 1
with mp.Pool(2) as p:
results = p.map(dispatcher, task_generator())
merged = pd.concat(results, axis=0)
print(merged)
Если происходит процесс объединения Dataframe является узким местом, подход с imap может стать оптимальным.
версия imap
with mp.Pool(2) as p:
merged = pd.DataFrame()
for result in p.imap_unordered(dispatcher, task_generator()):
merged = pd.concat([merged,result], axis=0)
print(merged)
Основное отличие состоит в том, что в случае карты программа сначала ожидает завершить все задачи процесса, а затем объединить все кадры данных.
В случае imap_unoredered, как только задача завершится, кадр данных объединяется с текущими результатами.