Запустите REST Controller в отдельном потоке внутри микросервиса python - PullRequest
0 голосов
/ 14 января 2019

Я пытаюсь разработать микросервис python, который может обрабатывать запросы REST API, а также обрабатывать сообщения от брокера kafka.

Мой REST-контроллер Tornado выглядит следующим образом:

class HelloHandler(RequestHandler):
    def get(self):
        self.write({'message': 'hello world'})


def make_app():
    urls = [("/", HelloHandler)]
    return Application(urls)


def tornado_thread():
    app = make_app()
    app.listen(3000)
    IOLoop.instance().start()

это мой основной класс микросервиса:

class Entrypoint(BaseMicroservice):

    def __init__(self):
        self.config = safe_load(open(sys.argv[1]))
        self.dict = {
            MessageType.detected_scenes.name: ProcessedSceneHandler(self.config),
        }
        super().__init__(self.dict, self.config.get('kafka'))

    def on_message_received(self, generic_message):
        self.dict.get(generic_message.metadata_type).handle(generic_message.message)



t = threading.Thread(target=entrypoint_controller.tornado_thread())
t.start()
Entrypoint().run()

BaseMicroservice - это абстрактный класс, который я реализовал для разделения функций между микросервисами:

class BaseMicroservice(ABC):
    def __init__(self, handlers, kafka_cfg):
        super().__init__()
        self.handlers = handlers
        #TODO da cambiare configurazione
        self.consumer = KafkaConsumer(
            kafka_cfg.get('input_topic'),
            bootstrap_servers=kafka_cfg.get('bootstrap_servers'),
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            group_id=kafka_cfg.get('group_id'),
            value_deserializer=lambda m: json.loads(m.decode('utf-8')))

    def run(self):
        for message in self.consumer:
            cl.logging.info(message.value)
            my_message = GenericMessage(json=message.value)
            self.is_my_message(my_message)

    def is_my_message(self, generic_message):
        if generic_message.metadata_type in self.handlers:
            self.on_message_received(generic_message)

    @abstractmethod
    def on_message_received(self, generic_message):
        pass

это явно не работает, так как мне не удается одновременно запустить свой микросервис и контроллер REST. Чего мне не хватает?

1 Ответ

0 голосов
/ 14 января 2019

Для запуска микросервиса и одновременной работы контроллера REST я бы посоветовал вместо использования потоков использовать Process

.

https://docs.python.org/3/library/multiprocessing.html

...