Как выйти из процесса while l oop в многопроцессорном пуле из основного скрипта в python - PullRequest
2 голосов
/ 03 августа 2020

РЕДАКТИРОВАТЬ 3: См. Последний пример в конце.

Мне нужно время l oop выполнение непрерывных операций отправки и возврата через USB-соединение. Во время этой непрерывной операции мне нужно (среди прочего в моем основном сценарии) несколько идентичных и изолированных операций отправки / возврата на том же USB-соединении. Похоже, это требует многопроцессорной обработки и некоторых настроек.

Я хочу использовать следующий обходной путь с библиотекой многопроцессорности:

  1. Поместите непрерывную операцию отправки / возврата в другой поток с пулом (apply_asyn c).
  2. Переведите этот процесс в режим ожидания, когда я выполняю изолированную операцию отправки / возврата (с использованием clear ()).
  3. Сразу после изолированной операции отправки / возврата возобновить непрерывную отправку / возврат (используя set ()).
  4. Остановить непрерывную отправку / возврат, когда я дойду до конца основного скрипта (здесь у меня еще нет решения, должно быть x.stop () или что-то в этом роде вот так, поскольку terminate () не работает).
  5. Получить какое-то возвращаемое значение из остановленного процесса (используйте get ()).

Я уже пробовал пару вещей, но я просто не могу выйти из while l oop через главную команду.

   import multiprocessing
   import time

   def setup(event):
       global unpaused
       unpaused = event
   
   class func:
       def __init__(self):
           self.finished = False
       
       def stop(self):
           self.finished = True
           
       def myFunction(self, arg):
           i = 0
           s=[]
           while self.finished == False:
               unpaused.wait()
               print(arg+i)
               s.append(arg+i)
               i=i+1
               time.sleep(1)
           return s
       
   if __name__ == "__main__":
       x=func()
       event = multiprocessing.Event() # initially unset, so workers will be paused at first
       pool = multiprocessing.Pool(1, setup, (event,))
       result = pool.apply_async(x.myFunction, (10,))
       print('We unpause for 2 sec')
       event.set()   # unpause
       time.sleep(2)
       print('We pause for 2 sec')
       event.clear() # pause
       time.sleep(2)
       print('We unpause for 2 sec')
       event.set()   # unpause
       time.sleep(2)
       print('Now we try to terminate in 2 sec')
       time.sleep(2)
       x.stop()
       return_val = result.get()
       print('get worked with '+str(return_val))

Может ли кто-нибудь указать мне правильное направление? Как видно, это не остановится с помощью x.stop (). Глобальные значения также не работают.

Заранее спасибо.

EDIT:

как и было предложено, я попытался поместить многопроцессорность в отдельный объект. Сделано ли это путем помещения функций в класс, как в моем примере ниже?

import multiprocessing
import time

class func(object):    
   def __init__(self):
       self.event = multiprocessing.Event() # initially unset, so workers             will be paused at first
       self.pool = multiprocessing.Pool(1, self.setup, (self.event,))
       
   def setup(self):
       global unpaused
       unpaused = self.event

   def stop(self):
       self.finished = True
      
   def resume(self):
       self.event.set() # unpause
       
   def hold(self):
       self.event.clear() #pause

   def run(self, arg):
       self.pool.apply_async(self.myFunction, (arg,))
       
   def myFunction(self, arg):
       i = 0
       s=[]
       self.finished = False
       while self.finished == False:
          unpaused.wait()
          print(arg+i)
           s.append(arg+i)
           i=i+1
           time.sleep(1)
       return s
   
if __name__ == "__main__":
   x=func()
   result = x.run(10)   
   print('We unpause for 2 sec')
   x.resume()   # unpause
   time.sleep(2)
   print('We pause for 2 sec')
   x.hold() # pause
   time.sleep(2)
   print('We unpause for 2 sec')
   x.resume()   # unpause
   time.sleep(2)
   print('Now we try to terminate in 2 sec')
   time.sleep(2)
   x.stop()
   return_val = result.get()
   print('get worked with '+str(return_val))

Я добавил функцию удержания и возобновления и поместил функцию настройки в один класс. Но в нижнем примере функция больше не запускается. Какая сложная маленькая проблема. Я озадачен этим.

EDIT2: Я пробовал обходной путь с тем, что я нашел до сих пор. При использовании библиотеки microprocessing.pool возникла большая проблема. Это непросто использовать с USB-соединением ... Я создал посредственный обходной путь ниже:

from multiprocessing.pool import ThreadPool
import time

class switch:
    state = 1
s1 = switch()

def myFunction(arg):
   i = 0

   while s1.state == 1 or s1.state == 2 or s1.state == 3:
       if s1.state == 1:
           print(arg+i)
           s.append(arg+i)
           i=i+1
           time.sleep(1)
       elif s1.state == 2:
           print('we entered snippet mode (state 2)')
           time.sleep(1)
           x = s
           return x
           pool.close()
           pool.join()
       elif s1.state == 3:
           while s1.state == 3:
               time.sleep(1) 
               print('holding (state 3)')
   return s


if __name__ == "__main__":
    global s
    s=[]
    
    print('we set the state in the class on top to ' +str(s1.state))
    pool = ThreadPool(processes=1)
    async_result = pool.apply_async(myFunction, (10,))
    print('in 5 sec we switch mode sir, buckle up')
    time.sleep(5)
    s1.state = 2
    print('we switched for a snippet which is')
    snippet = async_result.get()
    print(str(snippet[-1])+' this snippet comes from main')
    time.sleep(1)
    print('now we return to see the full list in the end')
    s1.state = 1
    async_result = pool.apply_async(myFunction, (10,))
    print('in 5 sec we hold it')
    time.sleep(5)
    s1.state = 3
    print('in 5 sec we exit')
    time.sleep(5)
    s1.state = 0
    return_val = async_result.get()
    print('Succsses if you see a list of numbers '+ str(return_val))

EDIT 3:

from multiprocessing.pool import ThreadPool
import time

class switch:
    state = 1
s1 = switch()

def myFunction(arg):
   i = 0

   while s1.state == 1 or s1.state == 2:
       if s1.state == 1:
           print(arg+i)
           s.append(arg+i)
           i=i+1
           time.sleep(1)
       elif s1.state == 2:
           print('we entered snippet mode (state 2)')
           time.sleep(1)
           x = s
           return x
           pool.close() #These are not relevant i guess.
           pool.join() #These are not relevant i guess.

   return s


if __name__ == "__main__":
    global s
    s=[]

    print('we set the state in the class on top to ' +str(s1.state))
    pool = ThreadPool(processes=1)
    async_result = pool.apply_async(myFunction, (10,))
    print('in 5 sec we switch mode sir, buckle up')
    time.sleep(5)
    s1.state = 2
    snippet = async_result.get()
    print(str(snippet[-1])+' this snippet comes from main')
    time.sleep(1)
    print('now we return to see the full list in the end')
    s1.state = 1
    async_result = pool.apply_async(myFunction, (10,))
    print('in 5 sec we exit')
    time.sleep(5)
    s1.state = 0
    return_val = async_result.get()
    print('Succsses if you see a list of numbers '+ str(return_val))

Ну, вот что я придумал с ... Не велико не страшно. Может быть, немного ужаснее (:

Ненавижу то, что мне приходится вспоминать функцию pool.apply_asyn c (myFunction, (10,)) после того, как я схватил один фрагмент данных. В настоящее время работает только ThreadingPool без каких-либо дальнейших изменений кода в моем фактическом скрипте!

1 Ответ

1 голос
/ 04 августа 2020

В ситуации, когда мне нужно, чтобы процесс работал непрерывно, а иногда выполнял другие действия, я предпочитаю использовать asyncio. Это грубый набросок того, как я подхожу к этому

import asyncio


class MyObject:
    def __init__(self):
        self.mydatastructure = []
        self.finished = False
        self.loop = None
        
    async def main_loop(self):
        while not self.finished:
            new_result = self.get_data()
            self.mydatastructure.append(new_result)
            await asyncio.sleep(0)
            
    async def timed_loop(self):
        while not self.finished:
            await asyncio.sleep(2)
            self.dotimedtask(self.mydatastructure)
    
    async def run(self):
        await asyncio.gather(self.main_loop(), self.timed_loop())


asyncio.run(MyObject().run())

, только одна сопрограмма будет работать одновременно, а синхронизированная одна будет планироваться каждые 2 секунды. Он всегда будет получать данные из самого последнего непрерывного выполнения. вы также можете делать такие вещи, как поддерживать соединение с объектом. В зависимости от ваших требований (это 2-секундный интервал или раз в две секунды, независимо от того, сколько времени это займет), существуют пакеты библиотек, чтобы сделать планирование более элегантным.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...