Очереди задач модульного тестирования в AppEngine - PullRequest
17 голосов
/ 08 февраля 2011

В течение очень долгого времени я использовал очереди задач в AppEngine для планирования задач, как я и предполагал.

Но меня всегда интересовало, как писатьтесты для этого?До сих пор я просто делал тесты, чтобы убедиться, что в API, который ставит задачу в очередь, не возникает ошибка, а затем написал более подходящие тесты для API, выполняющего задачу.

Однако в последнее время я началЯ чувствую себя немного неудовлетворенным этим, и я ищу способ действительно проверить, что правильная задача была добавлена ​​в правильную очередь.Надеюсь, это можно сделать лучше, чем просто развернув код и надеясь на лучшее.

Я использую django-nonrel, если это имеет какое-либо отношение к ответу.

Напомним:Как написать модульный тест для подтверждения того, что задачи поставлены в очередь?

Ответы [ 4 ]

18 голосов
/ 05 июля 2012

Если вы используете google.appengine.ext.testbed вместо GAE Testbed (GAE Testbed устарел и перемещается в ext.testbed), вы можете сделать следующее:

import base64
import unittest2

from google.appengine.ext import deferred
from google.appengine.ext import testbed


class TestTasks(unittest2.TestCase):
  def setUp(self):
    self.testbed = testbed.Testbed()
    self.testbed.activate()
    self.testbed.init_taskqueue_stub()
    self.taskqueue_stub = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)

  def tearDown(self):
    self.testbed.deactivate()

  def test_send_contact_request(self):
    # Make the request to your app that "defers" something:
    response = ...
    self.assertEqual(response.status_int, 200)

    # Get the task out of the queue
    tasks = self.taskqueue_stub.get_filtered_tasks()
    self.assertEqual(1, len(tasks))

    # Run the task
    task = tasks[0]
    deferred.run(task.payload)

    # Assert that other things happened (ie, if the deferred was sending mail...)
    self.assertEqual(...)
13 голосов
/ 08 февраля 2011

Использование GAE Test Bed позволит вам заглушить очередь задач.

Если вы наследуете от FunctionalTestCase или TaskQueueTestCase, вы получите такие методы, как get_tasks и assertTasksInQueue.

Вы также можете запускать задачи.Как это сделать, зависит от того, используете ли вы задачи или отложенные.

Для отсрочек у меня есть такой код:

from google.appengine.ext import deferred
import base64

# gets the most recent task -- since the task queue is reset between tests,
# this is usually what you want
def get_task(self):
    for task in self.get_task_queue_stub().GetTasks('default'):
        return task

# decode and execute the deferred method
def run_deferred(task):
    deferred.run(base64.b64decode(task['body']))

Выполнение задач аналогично, но после извлечения задачи вы используете WebTest (на котором построен тестовый стенд GAE).) отправить в виде запроса POST URL-адрес задачи с ее параметрами.

1 голос
/ 08 февраля 2011

Существует небольшой тестовый фреймворк под названием gaetestbed, который может удовлетворить ваши потребности. За подробностями обращайтесь: https://github.com/jgeewax/gaetestbed.

Эта среда тестирования работает в сочетании с плагином носа, носа и пакета WebTest. Насколько мне известно, данный набор пакетов Python - лучший способ протестировать приложения GAE.

0 голосов
/ 31 марта 2011

API SDK 1.4.3 Testbed обеспечивает простую настройку библиотек-заглушек для локальных интеграционных тестов.

Доступны сервисные заглушки для Task Queue.

...