Используя превосходный ответ Саксона, я смог сделать то же самое, используя испытательный стенд вместо gaetestbed. Вот что я сделал.
Добавил это в мой setUp()
:
self.taskqueue_stub = apiproxy_stub_map.apiproxy.GetStub('taskqueue')
Затем в своем тесте я использовал следующее:
# Execute the task in the taskqueue
tasks = self.taskqueue_stub.GetTasks("default")
self.assertEqual(len(tasks), 1)
task = tasks[0]
params = base64.b64decode(task["body"])
response = self.app.post(task["url"], params)
Где-то вдоль линии параметры POST кодируются в base64, поэтому пришлось отменить это, чтобы заставить его работать.
Мне нравится этот ответ лучше, чем ответ Саксона, поскольку я могу использовать официальный пакет тестового стенда и могу делать все это в своем собственном тестовом коде.
РЕДАКТИРОВАТЬ: Позже я захотел сделать то же самое с задачами, переданными с использованием отложенной библиотеки, и потребовалось немало потрясений, чтобы понять это, поэтому я делюсь здесь, чтобы облегчить боль других людей.
Если ваша очередь задач содержит только задачи, отправленные с отложенным выполнением, тогда будут запущены все задачи и любые задачи, поставленные в очередь этими задачами:
def submit_deferred(taskq):
tasks = taskq.GetTasks("default")
taskq.FlushQueue("default")
while tasks:
for task in tasks:
(func, args, opts) = pickle.loads(base64.b64decode(task["body"]))
func(*args)
tasks = taskq.GetTasks("default")
taskq.FlushQueue("default")