Я пытаюсь разработать микросервис python, который может обрабатывать запросы REST API, а также обрабатывать сообщения от брокера kafka.
Мой REST-контроллер Tornado выглядит следующим образом:
class HelloHandler(RequestHandler):
def get(self):
self.write({'message': 'hello world'})
def make_app():
urls = [("/", HelloHandler)]
return Application(urls)
def tornado_thread():
app = make_app()
app.listen(3000)
IOLoop.instance().start()
это мой основной класс микросервиса:
class Entrypoint(BaseMicroservice):
def __init__(self):
self.config = safe_load(open(sys.argv[1]))
self.dict = {
MessageType.detected_scenes.name: ProcessedSceneHandler(self.config),
}
super().__init__(self.dict, self.config.get('kafka'))
def on_message_received(self, generic_message):
self.dict.get(generic_message.metadata_type).handle(generic_message.message)
t = threading.Thread(target=entrypoint_controller.tornado_thread())
t.start()
Entrypoint().run()
BaseMicroservice - это абстрактный класс, который я реализовал для разделения функций между микросервисами:
class BaseMicroservice(ABC):
def __init__(self, handlers, kafka_cfg):
super().__init__()
self.handlers = handlers
#TODO da cambiare configurazione
self.consumer = KafkaConsumer(
kafka_cfg.get('input_topic'),
bootstrap_servers=kafka_cfg.get('bootstrap_servers'),
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id=kafka_cfg.get('group_id'),
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
def run(self):
for message in self.consumer:
cl.logging.info(message.value)
my_message = GenericMessage(json=message.value)
self.is_my_message(my_message)
def is_my_message(self, generic_message):
if generic_message.metadata_type in self.handlers:
self.on_message_received(generic_message)
@abstractmethod
def on_message_received(self, generic_message):
pass
это явно не работает, так как мне не удается одновременно запустить свой микросервис и контроллер REST. Чего мне не хватает?