Абстрагируйте «контекст GAE» для вашего кода.в производстве предоставьте реальную «реализацию GAE» для тестирования, предоставьте имитацию собственной, которая вызовет DeadlineExceededError.Тест не должен зависеть от тайм-аута, должен быть быстрым.
Пример абстракции (просто склейка):
class AbstractGAETaskContext(object):
def task_spired(): pass # this will throw exception in mock impl
# here you define any method that you call into GAE, to be mocked
def defered(...): pass
Если вам не нравится абстракция, вы можете сделать патч обезьяны для тестированияКроме того, вам также нужно определить функцию task_expired, которая будет вашим хуком для тестирования.task_expired должен вызываться во время выполнения функции вашей задачи.
* ОБНОВЛЕНО * Это 3-е решение:
Сначала я хочу упомянуть, что пример реализации Ника не так уж и велик, класс Mapper имеет множество сфер ответственности (отсрочка, запрос данных, обновление в пакетном режиме);и это делает тест трудным для выполнения, необходимо определить множество насмешек.Таким образом, я извлекаю отсроченные обязанности в отдельный класс. Вы хотите проверить только этот механизм отсрочки, то, что на самом деле происходит (обновление, запрос и т. Д.), Должно обрабатываться в другом тесте .
Вот класс отсрочки, также это больше не зависитна GAE:
class DeferredCall(object):
def __init__(self, deferred):
self.deferred = deferred
def run(self, long_execution_call, context, *args, **kwargs):
''' long_execution_call should return a tuple that tell us how was terminate operation, with timeout and the context where was abandoned '''
next_context, timeouted = long_execution_call(context, *args, **kwargs)
if timeouted:
self.deferred(self.run, next_context, *args, **kwargs)
Вот тестовый модуль:
class Test(unittest.TestCase):
def test_defer(self):
calls = []
def mock_deferrer(callback, *args, **kwargs):
calls.append((callback, args, kwargs))
def interrupted(self, context):
return "new_context", True
d = DeferredCall()
d.run(interrupted, "init_context")
self.assertEquals(1, len(calls), 'a deferred call should be')
def test_no_defer(self):
calls = []
def mock_deferrer(callback, *args, **kwargs):
calls.append((callback, args, kwargs))
def completed(self, context):
return None, False
d = DeferredCall()
d.run(completed, "init_context")
self.assertEquals(0, len(calls), 'no deferred call should be')
Как будет выглядеть реализация Nick's Mapper:
class Mapper:
...
def _continue(self, start_key, batch_size):
... # here is same code, nothing was changed
except DeadlineExceededError:
# Write any unfinished updates to the datastore.
self._batch_write()
# Queue a new task to pick up where we left off.
##deferred.defer(self._continue, start_key, batch_size)
return start_key, True ## make compatible with DeferredCall
self.finish()
return None, False ## make it comaptible with DeferredCall
runner = _continue
Код, в котором вы регистрируете длинныйтекущее задание;это зависит только от отложенной библиотеки GAE.
import DeferredCall
import PersonMapper # this inherits the Mapper
from google.appengine.ext import deferred
mapper = PersonMapper()
DeferredCall(deferred).run(mapper.run)