Как проанализировать разные схемы JSON, содержащие похожие данные? - PullRequest
3 голосов
/ 09 мая 2019

Допустим, у меня есть две следующие схемы, в которые я отправляю сообщение в websocketstream и получаю обратно сообщение, содержащее аналогичные данные.

# First Schema
x_sent = {"Product": {"id": "123"}}

x_received = {"properties": {
    "id": {"type": "number"},
    "color": "green"}}

# Second Schema
y_sent = {"Item": {"Product": {"uid": "123"}}}

y_received = {"configs": {
    "id_number": "123",
    "type": "int"},
    "colour": "green"}

Если я хочу различить два потока, я могу отфильтровать содержимое сообщения:

if msg == "properties":
    use_schema_a()
if msg == "configs":
    use_schema_b()

Но это не очень СУХО, если количество различных схем растет. Я также мог бы сделать что-то вроде этого:

msg_routing = {"properties": use_schema_a,
               "configs": use_schema_b}

if msg:
    msg_routing[msg]()

Но тогда я все равно буду создавать функции для каждой схемы! Я чувствую, что что-то упустил (концептуально). Я хотел бы создать общий класс, который будет обрабатывать отправку и получение сообщений и иметь только данные о потоке для фильтрации в виде файла конфигурации.

Это может выглядеть примерно так:

{"schemaA": {"name": "service_ABC", "color": "properties.color", "send_id":"Product:id"},
 "schemaB": {"name": "service_DEF", "color": "configs.colour", "send_id":"Item:Product:uid"}}

Как и в приведенных выше примерах, необходимые мне данные будут такими же (green в этом примере). ID Мне нужно отправить, чтобы получить эти данные также похожи (123 в этом примере).

Итак, если я знаю схему данных, которые мне нужно отправлять и получать, как мне динамически создать что-то, что понимает эту схему?

Чтобы дать вам четкий пример отправной точки:

def on_message(received_msg):
    # The unparsed message we receive is something like
    #      {"properties": {
    #     "id": {"type": "number"},
    #     "color": "green"}}

    # Do our message filtering/parsing

    handle_message_contents(service_name, color)

1 Ответ

2 голосов
/ 12 мая 2019

Сначала вам нужно использовать схему для создания сообщения. Если вы создаете свои схемы, например, так:

schemas = {
  "service_ABC": {
    "send": {
      "id": ["Product", "id"],
    },
    "receive": {
      "color": ["properties", "color"],
    },  
  },
  "service_DEF": {
    "send": {
      "id": ["Item", "Product", "uid"],
      "cond": ["Item", "Condition"],
    },
    "receive": {
      "color": ["configs", "colour"],
    },
  },
}

затем вы можете использовать метод, который, при условии указания имени службы и правильных аргументов, может создать словарь данных для отправки:

def build_request(service, **kwargs):
  request = dict()
  for attribute, path in schemas[service]["send"].items():
    second_to_last_level = request
    last_level = request
    for level in path:
      second_to_last_level = last_level
      last_level = last_level.setdefault(level, dict())
    second_to_last_level[level] = kwargs[attribute]
  return request

Таким образом, вы можете добавлять различные параметры для отправки непосредственно в схему. Смотрите несколько примеров:

build_request("service_ABC", id="123") == {
  "Product": {
    "id": "123"
  }
}

build_request("service_DEF", id="123", cond="New") == {
  "Item": {
    "Product": {
      "uid": "123"
    },
    "Condition": "New"
  }    
}

Далее вам нужно определить, откуда пришло сообщение. Лучший способ сделать это - где-нибудь в апстриме и передать его вашему «процессору схемы» Если у вас нет возможности получить эту информацию вместе с вашим сообщением (в чем я сомневаюсь), вы можете использовать один из предложенных вами подходов.

Как только вы узнаете, из какой службы пришло сообщение (и, следовательно, какую схему использовать, вы можете обработать сообщение аналогично созданию запроса.

def process(service, msg):
  result = dict()
  for attribute, path in schemas[service]["receive"].items():
    value = msg
    for field in path:
      value = value[field]
    result[attribute] = value
  return result

Снова, см. Пример:

x_received = {
  "properties": {
    "id": {
      "type": "number"
    },
    "color": "green"
  }
}
process("service_ABC", x_received) == {
  "color": "green"
}

Если вы действительно не можете сохранить переменную service, чтобы передать ее в process(), то я думаю, что лучшим подходом будет тот, который msg_routing. Вы можете иметь это как отдельный словарь или даже добавить его к schemas. Кроме того, вы всегда можете проверить в process(), получили ли вы то, что ожидали, и если нет, попытаться применить следующую схему:

def process(msg):
  for service, schema in schemas.items():
    missing_something = False
    result = dict()
    for attribute, path in schema["receive"].items():
      value = msg
      for field in path:
        if not field in value:
          missing_something = True
          break
        value = value[field]
      if missing_something:
        break
      result[attribute] = value
    if not missing_something:
      return service, result
  raise RuntimeError("No schema applies")

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...