Вот ответ, который, я думаю, может сработать. Может быть есть идеи получше?
import asyncio
from threading import Thread
from flask import Flask
from flask_restplus import Api, Resource
app = Flask(__name__)
api = Api(app, version='1.0', title='Sample API',
description='A sample API',)
@api.route('/my-resource/', endpoint='my-resource')
class MyResource(Resource):
def get(self):
return {"response": cr.i}
def enable_coroutines():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
asyncio.ensure_future(cr.counter1())
asyncio.ensure_future(cr.counter2())
loop.run_forever()
finally:
loop.close()
class Coroutines:
def __init__(self):
self.i = 0
self.j = 9
async def counter1(self):
i = 0
while True:
self.i += 1
await asyncio.sleep(1)
print("My Coroutine")
print(self.i)
async def counter2(self):
while True:
self.j += 1
await asyncio.sleep(1)
print(self.j)
if __name__ == '__main__':
cr = Coroutines()
coroutines_thread = Thread(target=enable_coroutines, daemon=True)
coroutines_thread.start()
app.run(debug=True)