Использование нескольких потоков и Popen для взаимодействия с приложениями на основе CLI - PullRequest
0 голосов
/ 06 апреля 2020

До сих пор я использовал pexpect для взаимодействия с приложениями CLI. Для ограниченной цели, это служит только право. Однако теперь я хочу перейти за pexpect, чтобы иметь возможность создавать более сложный сценарий взаимодействия ios с большим контролем. Для этого я использую subprocess API Popen и многопоточную модель.

Идея состоит в том, чтобы иметь основной поток (поток eval в приведенном ниже фрагменте) и поток процесса. Поток процесса просто принимает входное сообщение (через Queue) и направляет его в stdin и передает stdout обратно в поток eval, который затем может решить, что делать дальше.

Проблема (ы):

  1. Обработка sudo:

Если я использую Popen с аргументом shell=False и токенизировать параметры команды, использующие shlex.split (), я не получаю строку терминатора (с параметром -S выходные данные не отображаются, без него выводится запрос пароля, но не возвращается в поток eval).

Если я использую его с shell=True и не токенизирую команду, я застрял в сценарии без вывода, и потоки не будут двигаться вперед.

Я также попытался поиграться с bufsize параметр, но, похоже, не помогает.

Может кто-нибудь помочь мне здесь?

def run_process_thread(thread_group, thread_name, from_parent, to_parent):
    print("Started thread: {a}".format(a = thread_name))
    thread_group.threads[thread_name]['started'] = True
    proc = None


    while True:
        #First, check if we have input from parent. If we do, process locally, or 
        # send to child thread
        #If we have something from input, then we don't care about output from proc
        # thread yet
        buffer = '' 
        try:
            input_msg = from_parent.get(block=False)
            print('Process thread got from parent: {a}\n'.format(a=input_msg['operation']), end='')
            if input_msg['operation']=='shutdown_runner':
                #Do something about the proc thread!
                if proc is not None:
                    proc.terminate()
                break

            print('Running: {a}'.format(a = input_msg['operation']))
            if proc is None:
#                 proc = subprocess.Popen(shlex.split(input_msg['operation']),
#                             bufsize=0, 
#                             stdout=subprocess.PIPE,
#                             stderr=subprocess.STDOUT,
#                             stdin = subprocess.PIPE,
#                             universal_newlines=True,
#                             shell=False)
                proc = subprocess.Popen(input_msg['operation'],
                            bufsize=0, 
                            stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT,
                            stdin = subprocess.PIPE,
                            universal_newlines=True,
                            shell=True)
                thread_group.threads[thread_name]['handle'] = proc
            else:
                #Write to stdin
                #print(input_msg['operation'], proc.stdin, flush=True)
                proc.stdin.write(input_msg['operation']+'\n')

            #Read output
            print('Read output now')
            #for line in io.TextIOWrapper(proc.stdout, encoding="utf-8"):
            for line in iter(proc.stdout.readline, ''):
                buffer += line
            print('Dispatch: {a}'.format(a=buffer))
            to_parent.put(buffer)

            from_parent.task_done()
        except queue.Empty:
            pass
            #print('Nothing to read from input queue')    
    print('Stopping proc thread')
    proc.stdout.close()
    proc.stderr.close()
    proc.terminate()
    try:
        proc.wait(timeout=1)
        print('==subprocess exited with rc =', proc.returncode)
    except subprocess.TimeoutExpired:
        print('subprocess dod not terminate in time')

def run_eval_thread(thread_group, thread_name, sequence={}):
    print("Started thread: {a}".format(a = thread_name))
    thread_group.threads[thread_name]['started'] = True

    to_child = queue.Queue()
    from_child =  queue.Queue()
    thread_group.threads['proc'] = {'instance': threading.Thread(target = run_process_thread,
                                             args = (thread_group, 'proc', to_child, from_child)),
                                    'queues': {'to_parent': from_child,
                                               'from_parent': to_child,},
                                    'started': False,
                                    }
    #thread_group.threads['proc']['instance'].daemon = True
    thread_group.threads['proc']['instance'].start() #Start process thread

    to_run = ['sudo ls -lrt', 'pwd']
    history = []
    while True:
        #First, check if we have input from parent. If we do, process locally, or 
        # send to child thread
        #If we have something from input, then we don't care about output from proc
        # thread yet 
        try:
            input = thread_group.threads['eval']['queues']['from_parent'].get(block=False)
            print('Eval thread got from parent: {a}\n'.format(a=input['operation']), end='')
            if input['operation']=='shutdown_runner':
                #Do something about the proc thread!
                #signal the proc threads to terminate
                to_child.put(input)
                break
        except queue.Empty:
            pass

        try:
            #send next command to proc thread
            cmd = to_run.pop(0)
            #print(cmd)
            to_child.put({'operation': cmd})
            #print(input['operation'], thread_group.threads['proc']['handle'].stdin, flush=True)
            #print('Nothing to read from input queue')

            #Then the input side queue for signals from proc thread
            output = from_child.get(block=True)
            print('Eval thread got: {a}'.format(a=output), end='')

            #Add to history
            history.append(cmd)

        except IndexError:
            #Ran all commands
            print('Ran all commands')
            to_child.put({'operation':'shutdown_runner'})
            break
        except queue.Empty:
            pass
            #print('Nothing to read from proc queue')
        except:
            print('Error in to_run.put')


    print('Stopping eval thread')

...