Создание шины событий микросервисов и REST api (python / flask) - PullRequest
0 голосов
/ 13 июля 2020

Фон

Я создаю свое первое приложение с использованием микросервисной архитектуры. Я буду работать в основном в Python, используя Flask.

Я подумываю о реализации шины событий / сообщений для координации действий между службами. Вот несколько сервисов, которые я собираюсь реализовать: Auth, Users, Posts и Chat. В приложении есть две сущности («Пользователь» и «Группа»), которые используются почти каждой службой. У меня есть отдельная база данных для каждой службы, и каждая база данных имеет собственные таблицы users и groups для управления данными пользователя / группы, указанными c для этой службы. Теперь, когда я думаю о таком событии, как создание нового пользователя, каждая служба должна будет создать новую запись в таблице users, поэтому я рассматриваю возможность использования шины событий.

Я прочитал это сообщение , в котором обсуждается CQRS и использование HTTP (REST) ​​для внешней связи между службами, при использовании шины событий для внутренней связи. Службы обрабатывают (HTTP) запросы и генерируют события об изменении данных (например, о создании нового пользователя службой аутентификации). Другие службы используют события, которые могут запускать другие процессы (и другие события).

Вопрос

Я повесил трубку, так это как на самом деле реализовать (в Python) службу, которая прослушивает как для HTTP-запросов, так и для новых событий в наборе подписанных каналов. Я понимаю, что вам нужно использовать такой инструмент, как redis / rabbitMQ, но можно ли обрабатывать оба типа запросов в одном процессе или вам нужно запустить два сервера (один для запросов REST, а другой для обработки событий)?

Кроме того, если у вас есть какие-либо комментарии по поводу общего подхода / архитектуры, описанного выше, я все слышу.

1 Ответ

0 голосов
/ 16 июля 2020

Итак, после дополнительных исследований и создания прототипа, один сервер может прослушивать как HTTP-запросы, так и события от брокера сообщений. Однако для этого требуется запуск двух отдельных процессов (один процесс веб-сервера для прослушивания HTTP и один процесс событий для прослушивания брокера сообщений).

Вот архитектура, которую я разработал для своего прототипа: enter image description here

The core modules (represented by the folder icon) represent the meat of a service, this is all of the code that actually changes data. The HTTP Server and the Event Worker both call methods from the core modules. Niether the HTTP Server or the Event Worker produce events, only the core modules produce events.

Here's a file structure:

Project
 |-Foo
 |  |- foo.py
 |  |- web.py
 |  |- worker.py
 |  |- revent.py
 |-Bar
 |  |- bar.py
 |  |- web.py
 |  |- worker.py
 |  |- revent.py

The web.py files are simple flask apps:

# bar.py
from flask import Flask, request
from bar import Bar


app = Flask(__name__)

@app.route('/bar')
def bar():
    return Bar.bar_action()

if __name__ == "__main__":
    app.run(port=5001, debug=1)

For both the event worker and the core modules, I used a module revent.py (redis + event) that I created. It consists of three classes:

  1. Event -- abstraction of an event
  2. Producer -- A service/class to be used by core modules to produce events into their event stream.
  3. Worker -- A event server to which you can map events to functions (sort of like routing HTTP endpoints in Flask), it also runs the event loop to listen for events.

Under the hood, this module is using потоки redis . Я вставлю код для revent.py ниже.

Но сначала вот пример примера для bar.py, который вызывается http-сервером и рабочим для выполнения работы и генерирует события о работе это происходит с потоком «bar» в redis.

# Bar/bar.py
from revent import Producer
import redis

class Bar():
    ep = Producer("bar", host="localhost", port=6379, db=0)

    @ep.event("update")
    def bar_action(self, foo, **kwargs):
        print("BAR ACTION")
        #ep.send_event("update", {"test": str(True)})
        return "BAR ACTION"

if __name__ == '__main__':
    Bar().bar_action("test", test="True")

Наконец, вот пример рабочего, который будет прослушивать события в потоке «bar» Foo/worker.py.

# Foo/worker.py
from revent import Worker

worker = Worker()

@worker.on('bar', "update")
def test(foo, test=False):
    if bool(test) == False:
        print('test')
    else:
        print('tested')

if __name__ == "__main__":
    worker.listen(host='127.0.0.1', port=6379, db=0)

Как и обещал, вот код созданного мной модуля revent.py. Вероятно, стоило бы добавить более развитую версию этого в pypl, но я просто использую символьную ссылку, чтобы синхронизировать две мои версии c.

# revent.py
import redis
from datetime import datetime
import functools

class Worker:
    # streams = {
    #   "bar": {
    #       "update": Foo.foo_action
    #   },
    # }

    def __init__(self):
        self._events = {}


    def on(self, stream, action, **options):
        """
        Wrapper to register a function to an event
        """
        def decorator(func):
            self.register_event(stream, action, func, **options)
            return func
        return decorator

    def register_event(self, stream, action, func, **options):
        """
        Map an event to a function
        """
        if stream in self._events.keys():
            self._events[stream][action] = func
        else:
            self._events[stream] = {action: func}

    def listen(self, host, port, db):
        """ 
        Main event loop
        Establish redis connection from passed parameters
        Wait for events from the specified streams
        Dispatch to appropriate event handler
        """
        self._r = redis.Redis(host=host, port=port, db=db)
        streams = " ".join(self._events.keys())
        while True:
            event = self._r.xread({streams: "$"}, None, 0) 
            # Call function that is mapped to this event
            self._dispatch(event)

    def _dispatch(self, event):
        """
        Call a function given an event

        If the event has been registered, the registered function will be called with the passed params.
        """
        e = Event(event=event)
        if e.action in self._events[e.stream].keys():
            func = self._events[e.stream][e.action]
            print(f"{datetime.now()} - Stream: {e.stream} - {e.event_id}: {e.action} {e.data}")
            return func(**e.data)


class Event():
    """
    Abstraction for an event 
    """
    def __init__(self, stream="", action="", data={}, event=None):
        self.stream = stream
        self.action = action
        self.data = data
        self.event_id=None
        if event:
            self.parse_event(event)

    def parse_event(self, event):
        # event = [[b'bar', [(b'1594764770578-0', {b'action': b'update', b'test': b'True'})]]]
        self.stream = event[0][0].decode('utf-8')
        self.event_id = event[0][1][0][0].decode('utf-8')
        self.data = event[0][1][0][1]
        self.action = self.data.pop(b'action').decode('utf-8')
        params = {}
        for k, v in self.data.items():
            params[k.decode('utf-8')] = v.decode('utf-8')
        self.data = params

    def publish(self, r):
        body = {
            "action": self.action
        }
        for k, v in self.data.items():
            body[k] = v
        r.xadd(self.stream, body)

class Producer:
    """
    Abstraction for a service (module) that publishes events about itself

    Manages stream information and can publish events
    """
    # stream = None
    # _r = redis.Redis(host="localhost", port=6379, db=0)

    def __init__(self, stream_name, host, port, db):
        self.stream = stream_name
        self._r = redis.Redis(host="localhost", port=6379, db=0)

    def send_event(self, action, data):
        e = Event(stream=self.stream, action=action, data=data)
        e.publish(self._r)

    def event(self, action, data={}):
        def decorator(func):
            @functools.wraps(func)
            def wrapped(*args, **kwargs):
                result = func(*args, **kwargs)
                arg_keys = func.__code__.co_varnames[1:-1]
                for i in range(1, len(args)):
                    kwargs[arg_keys[i-1]] = args[i]
                self.send_event(action, kwargs)
                return result           
            return wrapped
        return decorator


Итак, собираем все вместе. Модули foo.py и bar.py выполняют фактическую работу сервисов Foo и Bar соответственно. Их методы вызываются HTTP-сервером и обработчиком событий для обработки запросов / событий. Выполняя свою работу, эти два модуля генерируют события об изменении своего состояния, чтобы другие заинтересованные службы могли действовать соответствующим образом. HTTP-сервер - это обычное веб-приложение, например Flask. Обработчик событий похож по концепции на веб-сервер, который прослушивает события в redis вместо HTTP-запросов. Оба этих процесса (веб-сервер и обработчик событий) должны запускаться отдельно. Итак, если вы разрабатываете локально, вам нужно запускать их в другом терминале windows или с помощью оркестратора контейнеров / процессов.

Это было много. Надеюсь, это кому-то поможет, дайте мне знать в комментариях, если у вас есть вопросы.

Edit

Я загрузил файл revent.py в pypi как пакет - redisevents . Позже на этой неделе я добавлю дополнительную документацию о том, как его использовать / расширить.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...