Я пытаюсь найти способы отправки данных в работающий процесс Python (3.7+) через именованный канал. Например, процесс добавляет префикс c энтузиастов к строке и печатает ее. Процесс имеет свои собственные функции и работает бесконечно. Кроме того, у него есть новые вещи. Так как впереди несколько задач, есть очередь:
from queue import Queue
import time
import tkinter as tk
import os
import asyncio
import win32pipe, win32file, pywintypes
import sys
q = Queue()
for i in range(5):
q.put(i) #own tasks
class C_Window():
def __init__(self, parent=None, windowname='Window'):
self.parent = parent
self.root = tk.Tk()
self.sendBox = tk.Text(self.root, height=1, width=30)
self.sendBox.pack()
self.buttonCommit=tk.Button(self.root, height=1, width=10, text="Commit",
command=lambda: self.commit_button())
self.buttonCommit.pack()
self.root.update()
def commit_button(self):
inputValue=self.sendBox.get("1.0","end-1c")
self.sendBox.delete('1.0', tk.END)
q.put(inputValue)
async def async_tk(self):
while True:
self.root.update()
await asyncio.sleep(0.25)
async def async_main():
while True:
if q.empty():
print ("relaxing")
else:
print ("YEAY! ", q.get())
await asyncio.sleep(1)
if __name__ == '__main__':
window_obj = C_Window()
windowtask = asyncio.ensure_future(window_obj.async_tk()) # create_task() replaces ensure_future() in 3.7+
maintask = asyncio.ensure_future(async_main())
loop = asyncio.get_event_loop()
loop.run_forever()
Это работает, как и ожидалось. Очередь получается, когда что-то добавляется через интерфейс, который выполняется, и если в очереди ничего нет расслабляющий.
Теперь я хотел бы добавить такты извне в очередь через именованный канал. Для этого я создал класс трубы:
class C_Pipe():
def __init__(self):
self.pipe = win32pipe.CreateNamedPipe(r'\\.\pipe\mypipe',
win32pipe.PIPE_ACCESS_DUPLEX,
win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT,
1, # nMaxInstances
65536, # nOutBufferSize
65536, # nInBufferSize
0, # 50ms timeout (the default)
None) # securityAttributes
async def async_pipe(self):
win32pipe.ConnectNamedPipe(self.pipe)
while True:
try:
msg = win32file.ReadFile(self.pipe, 65536)[1].decode('utf-8')
print(msg)
self.main_queue.put(msg)
except pywintypes.error as e:
if e.winerror == 109: #no process on other side OR Pipe closed
win32pipe.DisconnectNamedPipe(self.pipe)
print("Reconnecting pipe")
win32pipe.ConnectNamedPipe(self.pipe)
else:
raise
await asyncio.sleep(0.25)
, а затем попытался запустить его с:
if __name__ == '__main__':
window_obj = C_Window()
windowtask = asyncio.ensure_future(window_obj.async_tk())
maintask = asyncio.ensure_future(async_main())
pipe_obj = C_Pipe()
pipetask = asyncio.ensure_future(pipe_obj.async_pipe())
loop = asyncio.get_event_loop()
loop.run_forever()
, который не работает. Как только это происходит, пипетки ставят блоки во время чтения, и все зависает. Вот почему я попытался поместить его в отдельный поток:
if __name__ == '__main__':
loop2 = asyncio.new_event_loop()
pipe_obj = C_Pipe()
pipetask = asyncio.run_coroutine_threadsafe(pipe_obj.async_pipe(),loop2)
loop = asyncio.get_event_loop()
window_obj = C_Window()
windowtask = asyncio.ensure_future(window_obj.async_tk())
maintask = asyncio.ensure_future(async_main())
loop.run_forever()
, но канал получает 1 сообщение, а затем завершается неудачно, не помещая данные в очередь. Мои вопросы:
- Есть ли способ вывести данные в очередь из внешнего процесса (и избавиться от именованного канала)?
- Есть ли способ запустить именованный канал в его собственном потоке, чтобы он мог оставаться заблокированным?
- Действительно ли asyncio является правильным выбором здесь? Я много читал о
asyncio
и multiprocessing
, но не могу найти четкую картинку
Большое спасибо!