Отправка данных другому уже запущенному процессу python через именованный канал - PullRequest
0 голосов
/ 04 февраля 2020

Я пытаюсь найти способы отправки данных в работающий процесс Python (3.7+) через именованный канал. Например, процесс добавляет префикс c энтузиастов к строке и печатает ее. Процесс имеет свои собственные функции и работает бесконечно. Кроме того, у него есть новые вещи. Так как впереди несколько задач, есть очередь:

from queue import Queue 
import time
import tkinter as tk 
import os
import asyncio
import win32pipe, win32file, pywintypes
import sys

q = Queue()

for i in range(5):
    q.put(i) #own tasks

class C_Window():
    def __init__(self, parent=None, windowname='Window'): 
        self.parent = parent
        self.root =  tk.Tk()
        self.sendBox = tk.Text(self.root, height=1, width=30)
        self.sendBox.pack()
        self.buttonCommit=tk.Button(self.root, height=1, width=10, text="Commit", 
                            command=lambda: self.commit_button())
        self.buttonCommit.pack()
        self.root.update()

    def commit_button(self):
        inputValue=self.sendBox.get("1.0","end-1c")
        self.sendBox.delete('1.0', tk.END)
        q.put(inputValue)

    async def async_tk(self):
        while True:
            self.root.update()
            await asyncio.sleep(0.25) 

async def async_main():
    while True:
        if q.empty():
            print ("relaxing")
        else: 
            print ("YEAY! ", q.get())
        await asyncio.sleep(1) 


if __name__ == '__main__':
    window_obj = C_Window()
    windowtask = asyncio.ensure_future(window_obj.async_tk()) # create_task() replaces ensure_future() in 3.7+ 
    maintask = asyncio.ensure_future(async_main()) 
    loop = asyncio.get_event_loop()
    loop.run_forever()

Это работает, как и ожидалось. Очередь получается, когда что-то добавляется через интерфейс, который выполняется, и если в очереди ничего нет расслабляющий.

Теперь я хотел бы добавить такты извне в очередь через именованный канал. Для этого я создал класс трубы:

class C_Pipe():
    def __init__(self): 
        self.pipe = win32pipe.CreateNamedPipe(r'\\.\pipe\mypipe',
                                    win32pipe.PIPE_ACCESS_DUPLEX,
                                    win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT,
                                    1,  # nMaxInstances
                                    65536,  # nOutBufferSize
                                    65536,  # nInBufferSize
                                    0, # 50ms timeout (the default)
                                    None) # securityAttributes

    async def async_pipe(self):
        win32pipe.ConnectNamedPipe(self.pipe)
        while True:
            try:
                msg = win32file.ReadFile(self.pipe, 65536)[1].decode('utf-8')
                print(msg)
                self.main_queue.put(msg)
            except pywintypes.error as e:
                if e.winerror == 109: #no process on other side OR Pipe closed
                    win32pipe.DisconnectNamedPipe(self.pipe)
                    print("Reconnecting pipe")
                    win32pipe.ConnectNamedPipe(self.pipe)
            else:
                raise
            await asyncio.sleep(0.25)   

, а затем попытался запустить его с:

if __name__ == '__main__':
    window_obj = C_Window()
    windowtask = asyncio.ensure_future(window_obj.async_tk()) 
    maintask = asyncio.ensure_future(async_main()) 
    pipe_obj = C_Pipe()
    pipetask = asyncio.ensure_future(pipe_obj.async_pipe()) 
    loop = asyncio.get_event_loop()
    loop.run_forever()

, который не работает. Как только это происходит, пипетки ставят блоки во время чтения, и все зависает. Вот почему я попытался поместить его в отдельный поток:

if __name__ == '__main__':

    loop2 = asyncio.new_event_loop()
    pipe_obj = C_Pipe()
    pipetask = asyncio.run_coroutine_threadsafe(pipe_obj.async_pipe(),loop2) 

    loop = asyncio.get_event_loop()
    window_obj = C_Window()
    windowtask = asyncio.ensure_future(window_obj.async_tk()) 
    maintask = asyncio.ensure_future(async_main()) 

    loop.run_forever()

, но канал получает 1 сообщение, а затем завершается неудачно, не помещая данные в очередь. Мои вопросы:

  1. Есть ли способ вывести данные в очередь из внешнего процесса (и избавиться от именованного канала)?
  2. Есть ли способ запустить именованный канал в его собственном потоке, чтобы он мог оставаться заблокированным?
  3. Действительно ли asyncio является правильным выбором здесь? Я много читал о asyncio и multiprocessing, но не могу найти четкую картинку

Большое спасибо!

1 Ответ

0 голосов
/ 04 февраля 2020
Есть ли способ запустить именованный канал в его собственном потоке, чтобы он мог оставаться заблокированным?

Это, вероятно, самый простой подход. Ваша попытка не удалась, потому что вы никогда не создавали другой поток. Вы создали два цикла событий и запустили только один. Идея с run_coroutine_threadsafe состоит в том, чтобы позволить потокам, не являющимся асинхронными, отправлять задания на (одиночное) событие asyncio l oop. Во-первых, связь основана на syn c API, поэтому она может оставаться syn c:

    # calling it sync_pipe, since it's no longer async
    def sync_pipe(self, enqueue):
        win32pipe.ConnectNamedPipe(self.pipe)
        while True:
            try:
                msg = win32file.ReadFile(self.pipe, 65536)[1].decode('utf-8')
                print(msg)
                enqueue(msg)
            except pywintypes.error as e:
                if e.winerror == 109: #no process on other side OR Pipe closed
                    win32pipe.DisconnectNamedPipe(self.pipe)
                    print("Reconnecting pipe")
                    win32pipe.ConnectNamedPipe(self.pipe)
            else:
                raise
            time.sleep(0.25)   

Обратите внимание, как мы отправили абстрактную функцию, которая определяет, как ставить в очередь что-то. При этом основной блок может выглядеть следующим образом:

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    def enqueue(item):
        loop.call_soon_threadsafe(queue.put_nowait, item)
    pipe_obj = C_Pipe()
    pipethread = threading.Thread(target=pipe_obj.sync_pipe, args=(enqueue,))

    window_obj = C_Window()
    windowtask = asyncio.ensure_future(window_obj.async_tk()) 
    maintask = asyncio.ensure_future(async_main(queue)) 

    loop.run_forever()

async_main теперь получает очередь, к которой он поделится с sync_pipe.

...