Итак, после дополнительных исследований и создания прототипа, один сервер может прослушивать как HTTP-запросы, так и события от брокера сообщений. Однако для этого требуется запуск двух отдельных процессов (один процесс веб-сервера для прослушивания HTTP и один процесс событий для прослушивания брокера сообщений).
Вот архитектура, которую я разработал для своего прототипа:
The core modules (represented by the folder icon) represent the meat of a service, this is all of the code that actually changes data. The HTTP Server and the Event Worker both call methods from the core modules. Niether the HTTP Server or the Event Worker produce events, only the core modules produce events.
Here's a file structure:
Project
|-Foo
| |- foo.py
| |- web.py
| |- worker.py
| |- revent.py
|-Bar
| |- bar.py
| |- web.py
| |- worker.py
| |- revent.py
The web.py
files are simple flask apps:
# bar.py
from flask import Flask, request
from bar import Bar
app = Flask(__name__)
@app.route('/bar')
def bar():
return Bar.bar_action()
if __name__ == "__main__":
app.run(port=5001, debug=1)
For both the event worker and the core modules, I used a module revent.py
(redis + event) that I created. It consists of three classes:
- Event -- abstraction of an event
- Producer -- A service/class to be used by core modules to produce events into their event stream.
- Worker -- A event server to which you can map events to functions (sort of like routing HTTP endpoints in Flask), it also runs the event loop to listen for events.
Under the hood, this module is using потоки redis . Я вставлю код для revent.py
ниже.
Но сначала вот пример примера для bar.py
, который вызывается http-сервером и рабочим для выполнения работы и генерирует события о работе это происходит с потоком «bar» в redis.
# Bar/bar.py
from revent import Producer
import redis
class Bar():
ep = Producer("bar", host="localhost", port=6379, db=0)
@ep.event("update")
def bar_action(self, foo, **kwargs):
print("BAR ACTION")
#ep.send_event("update", {"test": str(True)})
return "BAR ACTION"
if __name__ == '__main__':
Bar().bar_action("test", test="True")
Наконец, вот пример рабочего, который будет прослушивать события в потоке «bar» Foo/worker.py
.
# Foo/worker.py
from revent import Worker
worker = Worker()
@worker.on('bar', "update")
def test(foo, test=False):
if bool(test) == False:
print('test')
else:
print('tested')
if __name__ == "__main__":
worker.listen(host='127.0.0.1', port=6379, db=0)
Как и обещал, вот код созданного мной модуля revent.py
. Вероятно, стоило бы добавить более развитую версию этого в pypl, но я просто использую символьную ссылку, чтобы синхронизировать две мои версии c.
# revent.py
import redis
from datetime import datetime
import functools
class Worker:
# streams = {
# "bar": {
# "update": Foo.foo_action
# },
# }
def __init__(self):
self._events = {}
def on(self, stream, action, **options):
"""
Wrapper to register a function to an event
"""
def decorator(func):
self.register_event(stream, action, func, **options)
return func
return decorator
def register_event(self, stream, action, func, **options):
"""
Map an event to a function
"""
if stream in self._events.keys():
self._events[stream][action] = func
else:
self._events[stream] = {action: func}
def listen(self, host, port, db):
"""
Main event loop
Establish redis connection from passed parameters
Wait for events from the specified streams
Dispatch to appropriate event handler
"""
self._r = redis.Redis(host=host, port=port, db=db)
streams = " ".join(self._events.keys())
while True:
event = self._r.xread({streams: "$"}, None, 0)
# Call function that is mapped to this event
self._dispatch(event)
def _dispatch(self, event):
"""
Call a function given an event
If the event has been registered, the registered function will be called with the passed params.
"""
e = Event(event=event)
if e.action in self._events[e.stream].keys():
func = self._events[e.stream][e.action]
print(f"{datetime.now()} - Stream: {e.stream} - {e.event_id}: {e.action} {e.data}")
return func(**e.data)
class Event():
"""
Abstraction for an event
"""
def __init__(self, stream="", action="", data={}, event=None):
self.stream = stream
self.action = action
self.data = data
self.event_id=None
if event:
self.parse_event(event)
def parse_event(self, event):
# event = [[b'bar', [(b'1594764770578-0', {b'action': b'update', b'test': b'True'})]]]
self.stream = event[0][0].decode('utf-8')
self.event_id = event[0][1][0][0].decode('utf-8')
self.data = event[0][1][0][1]
self.action = self.data.pop(b'action').decode('utf-8')
params = {}
for k, v in self.data.items():
params[k.decode('utf-8')] = v.decode('utf-8')
self.data = params
def publish(self, r):
body = {
"action": self.action
}
for k, v in self.data.items():
body[k] = v
r.xadd(self.stream, body)
class Producer:
"""
Abstraction for a service (module) that publishes events about itself
Manages stream information and can publish events
"""
# stream = None
# _r = redis.Redis(host="localhost", port=6379, db=0)
def __init__(self, stream_name, host, port, db):
self.stream = stream_name
self._r = redis.Redis(host="localhost", port=6379, db=0)
def send_event(self, action, data):
e = Event(stream=self.stream, action=action, data=data)
e.publish(self._r)
def event(self, action, data={}):
def decorator(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
result = func(*args, **kwargs)
arg_keys = func.__code__.co_varnames[1:-1]
for i in range(1, len(args)):
kwargs[arg_keys[i-1]] = args[i]
self.send_event(action, kwargs)
return result
return wrapped
return decorator
Итак, собираем все вместе. Модули foo.py
и bar.py
выполняют фактическую работу сервисов Foo и Bar соответственно. Их методы вызываются HTTP-сервером и обработчиком событий для обработки запросов / событий. Выполняя свою работу, эти два модуля генерируют события об изменении своего состояния, чтобы другие заинтересованные службы могли действовать соответствующим образом. HTTP-сервер - это обычное веб-приложение, например Flask. Обработчик событий похож по концепции на веб-сервер, который прослушивает события в redis вместо HTTP-запросов. Оба этих процесса (веб-сервер и обработчик событий) должны запускаться отдельно. Итак, если вы разрабатываете локально, вам нужно запускать их в другом терминале windows или с помощью оркестратора контейнеров / процессов.
Это было много. Надеюсь, это кому-то поможет, дайте мне знать в комментариях, если у вас есть вопросы.
Edit
Я загрузил файл revent.py в pypi как пакет - redisevents . Позже на этой неделе я добавлю дополнительную документацию о том, как его использовать / расширить.