Python Многопроцессорное планирование - PullRequest
2 голосов
/ 23 января 2020

В Python 3.6 я параллельно запускаю несколько процессов, где каждый процесс пингует URL и возвращает Pandas фрейм данных. Я хочу продолжать (2+) процессы постоянно, я создал минимальный репрезентативный пример, как показано ниже.

Мои вопросы:

1) Я понимаю что, поскольку у меня разные функции, я не могу использовать Pool.map_async() и его варианты. Это правильно? Единственные примеры, которые я видел, повторяли одну и ту же функцию, например: этот ответ .

2) Как лучше всего запускать эту настройку бессрочно? В моем коде ниже я использую while l oop, который, я подозреваю, не подходит для этой цели.

3) Это способ, которым я использую Process а Manager оптимально? Я использую multiprocessing.Manager.dict() в качестве общего словаря для возврата результатов из процессов. В комментарии к этот ответ я увидел, что использование Queue здесь имеет смысл, однако у объекта Queue нет метода `.dict () '. Поэтому я не уверен, как это будет работать.

Буду благодарен за любые улучшения и предложения с примером кода .

import numpy as np
import pandas as pd
import multiprocessing
import time

def worker1(name, t , seed, return_dict):
    '''worker function'''
    print(str(name) + 'is here.')
    time.sleep(t)
    np.random.seed(seed)
    df= pd.DataFrame(np.random.randint(0,1000,8).reshape(2,4), columns=list('ABCD'))
    return_dict[name] = [df.columns.tolist()] + df.values.tolist()

def worker2(name, t, seed, return_dict):
    '''worker function'''
    print(str(name) + 'is here.')
    np.random.seed(seed)
    time.sleep(t)
    df = pd.DataFrame(np.random.randint(0, 1000, 12).reshape(3, 4), columns=list('ABCD'))

    return_dict[name] = [df.columns.tolist()] + df.values.tolist()

if __name__ == '__main__':
    t=1
    while True:

        start_time = time.time()
        manager = multiprocessing.Manager()
        parallel_dict = manager.dict()
        seed=np.random.randint(0,1000,1) # send seed to worker to return a diff df
        jobs = []
        p1 = multiprocessing.Process(target=worker1, args=('name1', t, seed, parallel_dict))
        p2 = multiprocessing.Process(target=worker2, args=('name2', t, seed+1, parallel_dict))
        jobs.append(p1)
        jobs.append(p2)
        p1.start()
        p2.start()
        for proc in jobs:
            proc.join()
        parallel_end_time = time.time() - start_time
        #print(parallel_dict)
        df1= pd.DataFrame(parallel_dict['name1'][1:],columns=parallel_dict['name1'][0])
        df2 = pd.DataFrame(parallel_dict['name2'][1:], columns=parallel_dict['name2'][0])
        merged_df = pd.concat([df1,df2], axis=0)
        print(merged_df)

1 Ответ

3 голосов
/ 01 февраля 2020

Ответ 1 (карта на несколько функций)

Вы технически правы. В случае map, map_asyn c и других вариантов вам следует использовать одну функцию.

Но это ограничение можно обойти, реализовав исполнителя и передав функцию для выполнения в составе параметров:

def dispatcher(args):
    return args[0](*args[1:])

Итак, минимальный рабочий пример:

import multiprocessing as mp

def function_1(v):
    print("hi %s"%v)
    return 1

def function_2(v):
    print("by %s"%v)
    return 2

def dispatcher(args):
    return args[0](*args[1:])

with mp.Pool(2) as p:
    tasks = [
        (function_1, "A"),
        (function_2, "B")
    ]
    r = p.map_async(dispatcher, tasks)
    r.wait()
    results = r.get()

Ответ 2 (планирование)

Я бы удалил время из сценария и запланировал задание cron ( вкл. GNU / Linux) ( на windows), так что ОС будет отвечать за его выполнение.

На Linux вы можете запустить cronotab -e и добавить следующая строка, чтобы скрипт запускался каждые 5 минут.

*/5 * * * * python /path/to/script.py

Ответ 3 (Общий словарь)

да, но нет.

Насколько мне известно, использование Manager для Данные, такие как коллекции, это лучший способ. Для массивов или примитивных типов (int, float, e cc) существует Value и Array , которые быстрее .

Как в документации

Объект менеджера, возвращаемый Manager (), управляет процессом сервера, который содержит> 1073 * объектов и позволяет другим процессам манипулировать ими с помощью прокси.

Менеджер, возвращаемый Manager () будет поддерживать список типов, dict, Namespace, Lock,> RLock, семафор, BoundedSemaphore, условие, событие, барьер, очередь, значение и> массив.

Диспетчеры процессов сервера более гибки, чем объекты с общей памятью, поскольку их можно настроить для поддержки произвольных типов объектов. Также один менеджер может совместно использоваться процессами на разных компьютерах по сети. Однако они работают медленнее, чем использование общей памяти.

Но вам нужно только вернуть Dataframe, поэтому общий словарь ему не нужен.

Очищено Код

Используя все предыдущие идеи, код можно переписать в следующем виде:

версия карты

import numpy as np
import pandas as pd
from time import sleep
import multiprocessing as mp

def worker1(t , seed):
    print('worker1 is here.')
    sleep(t)
    np.random.seed(seed)
    return pd.DataFrame(np.random.randint(0,1000,8).reshape(2,4), columns=list('ABCD'))


def worker2(t , seed):
    print('worker2 is here.')
    sleep(t)
    np.random.seed(seed)
    return pd.DataFrame(np.random.randint(0, 1000, 12).reshape(3, 4), columns=list('ABCD'))

def dispatcher(args):
    return args[0](*args[1:])

def task_generator(sleep_time=1):
    seed = np.random.randint(0,1000,1)
    yield worker1, sleep_time, seed    
    yield worker2, sleep_time, seed + 1

with mp.Pool(2) as p:
    results = p.map(dispatcher, task_generator())
    merged = pd.concat(results, axis=0)
    print(merged)

Если происходит процесс объединения Dataframe является узким местом, подход с imap может стать оптимальным.

версия imap

with mp.Pool(2) as p:
    merged = pd.DataFrame()
    for result in p.imap_unordered(dispatcher, task_generator()):
        merged = pd.concat([merged,result], axis=0)
    print(merged)

Основное отличие состоит в том, что в случае карты программа сначала ожидает завершить все задачи процесса, а затем объединить все кадры данных.

В случае imap_unoredered, как только задача завершится, кадр данных объединяется с текущими результатами.

...