Почему pool.map не запускает новый процесс? - PullRequest
0 голосов
/ 06 мая 2020

Я пытаюсь запустить метод-член в отдельном подпроцессе. Метод-член фактически действует как диспетчер обратных вызовов l oop, где он выполняет каждый обратный вызов.

Проблема, с которой я столкнулся, заключается в том, что обратные вызовы никогда не вызываются. Метод, который отвечает за порождение нового процесса, выполняется, но я не получаю вывода / сигнала от фактического диспетчера l oop или самого обратного вызова.

Новый процесс создается с помощью этого фрагмента:

def run_callback_disptacher(self):
    pool = pathos.multiprocessing.Pool(1)
    pool.map(self.execute_callbacks, [])
    print('run_callback_disptacher executed')

, а обратный вызов диспетчера выглядит следующим образом:

def execute_callbacks(self):
    print(f'dispatcher list: {self.callback_list}')
    while True:
        for callback in self.callback_list:
            callback()

Я использую модуль pathos.multiprocessing в Windows10 и это минимальный пример, демонстрирующий проблему:

import time
import torch 
import torchvision
from torchvision import models
from torch.utils import mkldnn as mkldnn_utils
import pathos

class SomeClass():
    def __init__(self, model_name='r18', use_jit=False, use_mkldnn=False, device='cpu'):
        self.model_name = model_name
        self.use_jit = use_jit
        self.use_mkldnn = use_mkldnn 
        self.device = device
        self.callback_list = []
        self.is_running = False
        self._init_model()

    def _init_model(self):
        if self.model_name == 'r18':
            self.model = models.resnet18(pretrained=True)
        elif self.model_name == 'r50':
            self.model = models.resnet50(pretrained=True)
        else:
            raise Exception(f"Model name: '{self.model_name}' is not recognized.")

        self.model = self.model.to(self.device)
        self.model.eval()

        if self.use_mkldnn:
            self.model = mkldnn_utils.to_mkldnn(self.model)
        if self.use_jit: 
            self.model = self.load_jit_model()

    def load_jit_model(self, jit_path = 'model.jit'):
        dummy_input = torch.tensor(torch.rand(size=(1, 3, 224, 224)))
        model = torch.jit.trace(self.model, dummy_input)
        torch.jit.save(model, jit_path)
        return torch.jit.load(jit_path)

    def add_callback(self, callback):
        self.callback_list.append(callback)

    def remove_callback(self, callback):
        self.callback_list.remove(callback)

    def get_callbacks(self):
        return self.callback_list

    def execute_callbacks(self):
        print(f'dispatcher list: {self.callback_list}')
        while True:
            for callback in self.callback_list:
                callback()

    def start(self):
        self.is_running = True
        while self.is_running:
            # simulating a generic operation here
            time.sleep(0.2)
        print('start ended!')

    def stop(self):
        self.is_running = False

    def run_callback_disptacher(self):
        pool = pathos.multiprocessing.Pool(1)
        pool.map(self.execute_callbacks, [])
        print('run_callback_disptacher executed')

и так он называется:

import threading
import time
from minimal_example import SomeClass

def simple_callback():
    print('hello from simple callback')

def start():
    obj = SomeClass(model_name='r18', use_jit=False, use_mkldnn=False, device='cpu')
    obj.add_callback(simple_callback)
    obj.run_callback_disptacher()
    starter = threading.Thread(target=obj.start)
    starter.start()
    time.sleep(5)
    print(obj.get_callbacks())
    obj.stop()
    print('Done!')

if __name__ == '__main__':
    start()

Что мне здесь не хватает?

...