У меня есть требование, где будет иметься вызов API POST, прочитайте json данные из этого и опубликуйте sh сообщения на этот topi c "/ home / floor_1 / room" перед публикацией, необходимо подключиться к брокер также, а затем вернуть успешный ответ. Структура проекта:
│ app.py
│ config.py
│ Dockerfile
│ requirement.txt
│
├───app_services
│ │ __init__.py
│ │
│ ├───controller
│ │ │ send_down_link.py
│ │ │ __init__.py
│ │
│ ├───models
│ │ │ __init__.py
│ │ │
│ │ ├───database
│ │ │ │ device.py
В app.py
from app_services import app
if __name__ == '__main__':
app.run(host=app.config['HOST'], port=app.config['PORT'], debug=app.config['DEBUG'])
в app_service / init .py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_restful import Api
from flask_mqtt import Mqtt
from app_service.controller.send_down_link import SendDownlink
app = Flask(__name__)
db = SQLAlchemy(app)
mqtt = Mqtt(app)
api = Api(app)
api.add_resource(SendDownlink, "/api/t1/device/")
В контроллере / send_down_link .py:
class SendDownlink(Resource):
def post(self):
input_data = request.get_json()
message = input_data['message']
Я хочу опубликовать sh это сообщение в "/ home / floor_1 / room" topi c. Так что я просто хочу разместить mqtt-код в этом методе «post». Не знаю, как обойти это. Любое предложение?