Сначала вам нужно использовать схему для создания сообщения. Если вы создаете свои схемы, например, так:
schemas = {
"service_ABC": {
"send": {
"id": ["Product", "id"],
},
"receive": {
"color": ["properties", "color"],
},
},
"service_DEF": {
"send": {
"id": ["Item", "Product", "uid"],
"cond": ["Item", "Condition"],
},
"receive": {
"color": ["configs", "colour"],
},
},
}
затем вы можете использовать метод, который, при условии указания имени службы и правильных аргументов, может создать словарь данных для отправки:
def build_request(service, **kwargs):
request = dict()
for attribute, path in schemas[service]["send"].items():
second_to_last_level = request
last_level = request
for level in path:
second_to_last_level = last_level
last_level = last_level.setdefault(level, dict())
second_to_last_level[level] = kwargs[attribute]
return request
Таким образом, вы можете добавлять различные параметры для отправки непосредственно в схему. Смотрите несколько примеров:
build_request("service_ABC", id="123") == {
"Product": {
"id": "123"
}
}
build_request("service_DEF", id="123", cond="New") == {
"Item": {
"Product": {
"uid": "123"
},
"Condition": "New"
}
}
Далее вам нужно определить, откуда пришло сообщение. Лучший способ сделать это - где-нибудь в апстриме и передать его вашему «процессору схемы» Если у вас нет возможности получить эту информацию вместе с вашим сообщением (в чем я сомневаюсь), вы можете использовать один из предложенных вами подходов.
Как только вы узнаете, из какой службы пришло сообщение (и, следовательно, какую схему использовать, вы можете обработать сообщение аналогично созданию запроса.
def process(service, msg):
result = dict()
for attribute, path in schemas[service]["receive"].items():
value = msg
for field in path:
value = value[field]
result[attribute] = value
return result
Снова, см. Пример:
x_received = {
"properties": {
"id": {
"type": "number"
},
"color": "green"
}
}
process("service_ABC", x_received) == {
"color": "green"
}
Если вы действительно не можете сохранить переменную service
, чтобы передать ее в process()
, то я думаю, что лучшим подходом будет тот, который msg_routing
. Вы можете иметь это как отдельный словарь или даже добавить его к schemas
. Кроме того, вы всегда можете проверить в process()
, получили ли вы то, что ожидали, и если нет, попытаться применить следующую схему:
def process(msg):
for service, schema in schemas.items():
missing_something = False
result = dict()
for attribute, path in schema["receive"].items():
value = msg
for field in path:
if not field in value:
missing_something = True
break
value = value[field]
if missing_something:
break
result[attribute] = value
if not missing_something:
return service, result
raise RuntimeError("No schema applies")