Python3 Asyncio и почтовый запрос - PullRequest
0 голосов
/ 07 ноября 2018

Используя Python3.7, у меня есть два сценария py. server_execute.py должен принять почтовый запрос, а после его принятия он должен вызвать server_scripts.py, который должен запускать тестовые сценарии асинхронно.

Сценарий server_execute.py примет пост-запрос, но я не могу заставить его запустить асинхронную функцию server_scripts.py для выполнения тестов. Я новичок в размещении запросов и асинхронном программировании ... Любая помощь очень ценится, спасибо!

server_execute.py

#!/usr/bin/python3
import server_scripts
from aiohttp import web
import json

global e2e


async def handle(request):
    response_obj = {'status': 'success'}
    return web.Response(text=json.dumps(response_obj), status=200)

#in python command prompt enter the following for the post request:
# import requests
# response = requests.post("http://localhost:8080/e2e?name=222")
# response.json()


async def e2e(request):
    try:
        e2e = request.query['name']
        print("Running e2e tests:", e2e)
        server_scripts.runPyTest222("test")
        print("Parallel script execution completed.")
        response_obj = {'status': 'success', 'message': 'e2e successfully run'}
        return web.Response(text=json.dumps(response_obj), status=200)
    except Exception as e:
        response_obj = {'status': 'failed', 'message': str(e)}
        return web.Response(text=json.dumps(response_obj), status=500)


app = web.Application()
app.router.add_get('/', handle)
app.router.add_post('/e2e', e2e)

web.run_app(app)

server_scripts.py

#!/usr/bin/python3
import asyncio
import time
import datetime
from colorama import Fore
from colorama import Style


async def run(cmd):
        proc = await asyncio.create_subprocess_shell(
            cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE)

        stdout, stderr = await proc.communicate()

        print(f'[{cmd!r} exited with {proc.returncode}]')
        if stdout:
            if proc.returncode == 0:
                print(f'[stdout]\n{Fore.GREEN}{stdout.decode()}{Style.RESET_ALL}!')
            else:
                print(f'[stdout]\n{Fore.RED}{stdout.decode()}{Style.RESET_ALL}!')
        if stderr:
            print(f'[stderr]\n{Fore.RED}{stderr.decode()}{Style.RESET_ALL}!')


async def runPyTest222(filename):
    print('started at this time:', time.strftime('%X'))
    await asyncio.gather(
        run('python3 -m pytest test_222_rlv_overview_005.py'),
        run('python3 -m pytest test_222_ltv_overview_002.py'),
    )
    print('finished at', time.strftime('%X'))


if __name__ == '__main__':
    a = datetime.datetime.now()
    asyncio.run(runPyTest222("test"))
    b = datetime.datetime.now()
    print(b - a)
...