Как я могу получить доступ к фиктивной шине `dbusmock` в коде, переданном в` DBusMockObject.AddMethod`? - PullRequest
0 голосов
/ 24 января 2020

Я пытаюсь использовать библиотеку Python dbusmock для настройки интеграционного теста программы, которая отправляет сообщение объекту dbus, который выполняет некоторую обработку и затем перенаправляет результат другому объекту, чей путь был указан в начальном запросе. Таким образом, синхронного возврата нет, поток выглядит следующим образом:

  1. (тест), настройка фиктивной шины сеанса и регистрация макета по пути /my/request/processor.
  2. ( Реальный код) отправьте сообщение на /my/request/processor, содержащее запрос и путь к /my/response/handler.
  3. (в этом тесте макет) получите запрос и отправьте ответ на /my/response/handler.
  4. (реальный код) получает сообщение на /my/response/handler.
  5. (тест), проверяет, что полученный ответ соответствует ожидаемому.

Я настраиваюсь макет с использованием dbusmock макет объекта, который принимает код для запуска по запросу в виде строки. Вот тест и установочный код:

class TestDbusIntegration(dbusmock.DBusTestCase):
    @classmethod
    def setUpClass(c):
        c.start_session_bus()

    def create_dbus_listener(self, server_name, selector_name):
        bus = self.get_dbus(system_bus=False)
        object_name = 'online.labrary.{}'.format(server_name)
        object_path = '/online/labrary/{}'.format(server_name)
        server = self.spawn_server(object_name,
                                   object_path,
                                   'online.labrary.nanoservice', #interface
                                   system_bus=False,
                                   stdout=subprocess.PIPE)
        interface = dbus.Interface(bus.get_object(object_name,
                                                  object_path),
                                   dbusmock.MOCK_IFACE)
        # The input signature here is complex, four arguments:
        # 1. String, the selector name
        # 2. String, the request (I will make this variant later)
        # 3. Object, where the response should be posted
        # 4. String, selector to post response to (will be emitString:)
        response_code = ''
        with open('dbus_forward_response.py', 'r') as f:
            response_code = f.read()
        interface.AddMethod('', #Interface - chose the main interface here
                            'msg_send',
                            'ssos', #Input signature
                            '', #Output signature - void
                            response_code) # Code - should post response
        return (server, interface)

    def test_sending_request_to_named_server(self):
        # set up a process to receive the message
        server_name = "test_server"
        selector_name = "handle_request:"
        (server, interface) = self.create_dbus_listener(server_name, selector_name)
        message = "GET /index.html HTTP/1.1\n\n"
        # send the message
        response = Nanoservice().dispatch_nanoservice_request(server_name, selector_name, message)
        # expect the message sent by the test process
        self.assertEqual(response, 'HTTP/1.1 204 No Content\n\n')
        # tear down mock
        server.terminate()
        server.wait()

У меня возникают проблемы с настройкой поведения test_server до самого с использованием фиктивной шины вместо реальной сессионный автобус. Код в dbus_forward_response.py выглядит следующим образом:

import dbus

selector = args[0]
request = args[1]
forward_obj_path = args[2]
forward_selector = args[3]

session_bus = dbus.SessionBus()
# get proxy for sending message
proxy = session_bus.get_object('online.labrary.nanoservice',
                               forward_obj_path)
interface = dbus.Interface(proxy, dbus_interface='online.labrary.nanoservice')
interface.msg_send(forward_selector, request)

Это, очевидно, использование реальной шины сеанса dbus, что приводит к этой ошибке:

E dbus.exceptions.DBusException: org .freedesktop.DBus.Error.ServiceUnknown: Имя online.labrary.nanoservice не было предоставлено никакими файлами .service

Можно ли выполнить одно из этих действий?

  1. (предпочтительно) получить код обработчика в моем фиктивном объекте dbus для отправки сообщений по той же шине, по которой был получен запрос; или
  2. (достаточно) получить код обработчика для использования фиктивного сеанса связи dbus, созданного в тесте?
...