Получение последних строк потокового stdout из подпроцесса Python - PullRequest
0 голосов
/ 29 мая 2020

Моя цель: Прочитать последний «кусок» (N строк) потокового вывода каждые M секунд из подпроцесса.

Текущий код:

  1. запустить подпроцесс
  2. читает стандартный вывод
  3. как только у меня есть кусок из N строк, распечатать его (или сохранить как текущий)
  4. подождите M секунд
  5. повтор
  6. Я также поместил код на момент завершения подпроцесса (который является бесконечным потоком, пока вы не нажмете Ctrl- C)

Я хочу достичь после того, как я подожду M секунд, если он всегда будет читать строки latest N, а не последующие N строк в stdout (их можно отбросить как Меня интересуют только последние)

Моя конечная цель - создать поток для запуска процесса и сохранить последние строки, а затем вызывать из основного процесса всякий раз, когда мне нужны последние результаты потока .

Любая помощь будет принята с благодарностью!

#!/usr/bin/env python3
import signal
import time
from subprocess import Popen, PIPE

sig = signal.SIGTERM

N=9
M=5

countlines=0
p = Popen(["myprogram"], stdout=PIPE, bufsize=1, universal_newlines=True)

chunk=[]

for line in p.stdout:
    countlines+=1
    chunk.append(line)

    if len(chunk)==N:
        print(chunk)
        chunk=[]
        time.sleep(M)

    if countlines>100:
        p.send_signal(sig)
        break

print("done")

Ответы [ 2 ]

0 голосов
/ 29 мая 2020

Вот еще одно возможное решение. Это программа, которую вы должны запустить как отдельный процесс в конвейере, который представляет REST API, который при запросе будет возвращать последние N строк, прочитанных на стандартном вводе (где N и номер порта предоставляются на стандартном вводе). Он использует run в flask, поэтому не должен использоваться в ситуациях, когда внешний мир имеет доступ к порту локального сервера для выполнения запросов, хотя это можно адаптировать.

import sys
import time
import threading
import argparse
from flask import Flask, request
from flask_restful import Resource, Api


class Server:

    def __init__(self):
        self.data = {'at_eof': False,
                     'lines_read': 0,
                     'latest_lines': []}
        self.thread = None
        self.args = None
        self.stop = False


    def parse_args(self):
        parser = argparse.ArgumentParser()
        parser.add_argument("num_lines", type=int,
                            help="number of lines to cache")
        parser.add_argument("port", type=int,
                            help="port to serve on")
        self.args = parser.parse_args()


    def start_updater(self):
        def updater():
            lines = self.data['latest_lines']
            while True:
                if self.stop:
                    return
                line = sys.stdin.readline()
                if not line:
                    break
                self.data['lines_read'] += 1
                lines.append(line)
                while len(lines) > self.args.num_lines:
                    lines.pop(0)
            self.data['at_eof'] = True
        self.thread = threading.Thread(target=updater)
        self.thread.start()


    def get_data(self):
        return self.data


    def shutdown(self):
        self.stop = True
        func = request.environ.get('werkzeug.server.shutdown')
        if func:
            func()
            return 'Shutting down'
        else:
            return 'shutdown failed'


    def add_apis(self, app):

        class GetData(Resource):
            get = self.get_data

        class Shutdown(Resource):
            get = self.shutdown            

        api = Api(app)
        api.add_resource(GetData, "/getdata")
        api.add_resource(Shutdown, "/shutdown")


    def run(self):
        self.parse_args()
        self.start_updater()        
        app = Flask(__name__)
        self.add_apis(app)
        app.run(port=self.args.port)


server = Server()
server.run()

Пример использования: вот тестовая программа, вывод которой мы хотим обслуживать:

import sys
import time

for i in range(100):
    print("this is line {}".format(i))
    sys.stdout.flush()
    time.sleep(.1)

И простой конвейер для ее запуска (здесь из приглашения оболочки linux, но может быть выполнен через subprocess.Popen), обслуживающий последний 5 строк, на порту 8001:

python ./writer.py  | python ./server.py 5 8001

Пример запроса, здесь используется curl в качестве клиента, но это можно сделать через Python requests:

$ curl -s http://localhost:8001/getdata
{"at_eof": false, "lines_read": 30, "latest_lines": ["this is line 25\n", "this is line 26\n", "this is line 27\n", "this is line 28\n", "this is line 29\n"]}

сервер также предоставляет URL-адрес http://localhost:<port>/shutdown для его завершения, хотя, если вы вызовете его до того, как впервые увидите "at_eof": true, то ожидайте, что писатель отправит d ie со сломанным каналом.

0 голосов
/ 29 мая 2020

После долгих поисков я наткнулся на решение здесь:

https://eli.thegreenplace.net/2017/interacting-with-a-long-running-child-process-in-python/

Код Эли «Запускать, взаимодействовать, получать вывод в реальном времени, завершать» раздел у меня работал. Пока что это самое элегантное решение, которое я нашел.

Адаптировано к моей проблеме выше и написано внутри класса (здесь не показан):

def output_reader(self,proc):
    chunk=[]
    countlines=0
    for line in iter(proc.stdout.readline, b''):
        countlines+=1
        chunk.append(line.decode("utf-8"))
        if countlines==N:
            self.current_chunk = chunk
            chunk=[]
            countlines=0

def main():
    proc = subprocess.Popen(['myprocess'],
                            stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)

    t = threading.Thread(target=output_reader, args=(proc,))
    t.start()

    try:
        time.sleep(0.2)
        for i in range(10):
            time.sleep(1) # waits a while before getting latest lines
            print(self.current_chunk)
    finally:
        proc.terminate()
        try:
            proc.wait(timeout=0.2)
            print('== subprocess exited with rc =', proc.returncode)
        except subprocess.TimeoutExpired:
            print('subprocess did not terminate in time')
    t.join()
...