РЕДАКТИРОВАТЬ 3: См. Последний пример в конце.
Мне нужно время l oop выполнение непрерывных операций отправки и возврата через USB-соединение. Во время этой непрерывной операции мне нужно (среди прочего в моем основном сценарии) несколько идентичных и изолированных операций отправки / возврата на том же USB-соединении. Похоже, это требует многопроцессорной обработки и некоторых настроек.
Я хочу использовать следующий обходной путь с библиотекой многопроцессорности:
- Поместите непрерывную операцию отправки / возврата в другой поток с пулом (apply_asyn c).
- Переведите этот процесс в режим ожидания, когда я выполняю изолированную операцию отправки / возврата (с использованием clear ()).
- Сразу после изолированной операции отправки / возврата возобновить непрерывную отправку / возврат (используя set ()).
- Остановить непрерывную отправку / возврат, когда я дойду до конца основного скрипта (здесь у меня еще нет решения, должно быть x.stop () или что-то в этом роде вот так, поскольку terminate () не работает).
- Получить какое-то возвращаемое значение из остановленного процесса (используйте get ()).
Я уже пробовал пару вещей, но я просто не могу выйти из while l oop через главную команду.
import multiprocessing
import time
def setup(event):
global unpaused
unpaused = event
class func:
def __init__(self):
self.finished = False
def stop(self):
self.finished = True
def myFunction(self, arg):
i = 0
s=[]
while self.finished == False:
unpaused.wait()
print(arg+i)
s.append(arg+i)
i=i+1
time.sleep(1)
return s
if __name__ == "__main__":
x=func()
event = multiprocessing.Event() # initially unset, so workers will be paused at first
pool = multiprocessing.Pool(1, setup, (event,))
result = pool.apply_async(x.myFunction, (10,))
print('We unpause for 2 sec')
event.set() # unpause
time.sleep(2)
print('We pause for 2 sec')
event.clear() # pause
time.sleep(2)
print('We unpause for 2 sec')
event.set() # unpause
time.sleep(2)
print('Now we try to terminate in 2 sec')
time.sleep(2)
x.stop()
return_val = result.get()
print('get worked with '+str(return_val))
Может ли кто-нибудь указать мне правильное направление? Как видно, это не остановится с помощью x.stop (). Глобальные значения также не работают.
Заранее спасибо.
EDIT:
как и было предложено, я попытался поместить многопроцессорность в отдельный объект. Сделано ли это путем помещения функций в класс, как в моем примере ниже?
import multiprocessing
import time
class func(object):
def __init__(self):
self.event = multiprocessing.Event() # initially unset, so workers will be paused at first
self.pool = multiprocessing.Pool(1, self.setup, (self.event,))
def setup(self):
global unpaused
unpaused = self.event
def stop(self):
self.finished = True
def resume(self):
self.event.set() # unpause
def hold(self):
self.event.clear() #pause
def run(self, arg):
self.pool.apply_async(self.myFunction, (arg,))
def myFunction(self, arg):
i = 0
s=[]
self.finished = False
while self.finished == False:
unpaused.wait()
print(arg+i)
s.append(arg+i)
i=i+1
time.sleep(1)
return s
if __name__ == "__main__":
x=func()
result = x.run(10)
print('We unpause for 2 sec')
x.resume() # unpause
time.sleep(2)
print('We pause for 2 sec')
x.hold() # pause
time.sleep(2)
print('We unpause for 2 sec')
x.resume() # unpause
time.sleep(2)
print('Now we try to terminate in 2 sec')
time.sleep(2)
x.stop()
return_val = result.get()
print('get worked with '+str(return_val))
Я добавил функцию удержания и возобновления и поместил функцию настройки в один класс. Но в нижнем примере функция больше не запускается. Какая сложная маленькая проблема. Я озадачен этим.
EDIT2: Я пробовал обходной путь с тем, что я нашел до сих пор. При использовании библиотеки microprocessing.pool возникла большая проблема. Это непросто использовать с USB-соединением ... Я создал посредственный обходной путь ниже:
from multiprocessing.pool import ThreadPool
import time
class switch:
state = 1
s1 = switch()
def myFunction(arg):
i = 0
while s1.state == 1 or s1.state == 2 or s1.state == 3:
if s1.state == 1:
print(arg+i)
s.append(arg+i)
i=i+1
time.sleep(1)
elif s1.state == 2:
print('we entered snippet mode (state 2)')
time.sleep(1)
x = s
return x
pool.close()
pool.join()
elif s1.state == 3:
while s1.state == 3:
time.sleep(1)
print('holding (state 3)')
return s
if __name__ == "__main__":
global s
s=[]
print('we set the state in the class on top to ' +str(s1.state))
pool = ThreadPool(processes=1)
async_result = pool.apply_async(myFunction, (10,))
print('in 5 sec we switch mode sir, buckle up')
time.sleep(5)
s1.state = 2
print('we switched for a snippet which is')
snippet = async_result.get()
print(str(snippet[-1])+' this snippet comes from main')
time.sleep(1)
print('now we return to see the full list in the end')
s1.state = 1
async_result = pool.apply_async(myFunction, (10,))
print('in 5 sec we hold it')
time.sleep(5)
s1.state = 3
print('in 5 sec we exit')
time.sleep(5)
s1.state = 0
return_val = async_result.get()
print('Succsses if you see a list of numbers '+ str(return_val))
EDIT 3:
from multiprocessing.pool import ThreadPool
import time
class switch:
state = 1
s1 = switch()
def myFunction(arg):
i = 0
while s1.state == 1 or s1.state == 2:
if s1.state == 1:
print(arg+i)
s.append(arg+i)
i=i+1
time.sleep(1)
elif s1.state == 2:
print('we entered snippet mode (state 2)')
time.sleep(1)
x = s
return x
pool.close() #These are not relevant i guess.
pool.join() #These are not relevant i guess.
return s
if __name__ == "__main__":
global s
s=[]
print('we set the state in the class on top to ' +str(s1.state))
pool = ThreadPool(processes=1)
async_result = pool.apply_async(myFunction, (10,))
print('in 5 sec we switch mode sir, buckle up')
time.sleep(5)
s1.state = 2
snippet = async_result.get()
print(str(snippet[-1])+' this snippet comes from main')
time.sleep(1)
print('now we return to see the full list in the end')
s1.state = 1
async_result = pool.apply_async(myFunction, (10,))
print('in 5 sec we exit')
time.sleep(5)
s1.state = 0
return_val = async_result.get()
print('Succsses if you see a list of numbers '+ str(return_val))
Ну, вот что я придумал с ... Не велико не страшно. Может быть, немного ужаснее (:
Ненавижу то, что мне приходится вспоминать функцию pool.apply_asyn c (myFunction, (10,)) после того, как я схватил один фрагмент данных. В настоящее время работает только ThreadingPool без каких-либо дальнейших изменений кода в моем фактическом скрипте!