Можно ли предотвратить выполнение дальнейших задач в Locust TaskSequence, если какая-то задача не удалась? - PullRequest
0 голосов
/ 17 апреля 2020

Например, у меня есть следующий класс. Как я могу предотвратить выполнение задачи get_entity, если задача create_entity не была выполнена?

class MyTaskSequence(TaskSequence):

    @seq_task(1)
    def create_entity(self):
        self.round += 1
        with self.client.post('/entities', json={}, catch_response=True) as resp:
            if resp.status_code != HTTPStatus.CREATED:
                resp.failure()
                # how to stop other tasks for that run?

        self.entity_id = resp.json()['data']['entity_id']

    @seq_task(2)
    def get_entity(self):
        # It is being always executed, 
        # but it should not be run if create_entity task failed
        resp = self.client.get(f'/entities/{self.entity_id}')
        ...

Я нашел метод TaskSet.interrupt в документации, но не позволяет отменить root TaskSet. Я попытался создать родительский TaskSet для своей последовательности задач, поэтому TaskSet.interrupt работает.

class MyTaskSet(TaskSet):
    tasks = {MyTaskSequence: 10}

Но теперь я вижу, что все результаты в интерфейсе очищаются после того, как я позвоню interrupt!

Мне просто нужно пропустить зависимые задачи в этой последовательности. Мне нужны результаты.

Ответы [ 3 ]

0 голосов
/ 20 апреля 2020

Может self.interrupt() быть тем, что вы ищете?

См. https://docs.locust.io/en/latest/writing-a-locustfile.html#interrupting -a-taskset для справки.

0 голосов
/ 21 апреля 2020

Почему бы не использовать on_start(self):, который запускается один раз при создании саранчи, он может установить глобальный параметр, который можно проверить, выполняет ли саранча задачи

class MyTaskSequence(TaskSequence):
    entity_created = false

    def on_start(self):
        self.round += 1
        with self.client.post('/entities', json={}, catch_response=True) as resp:
            if resp.status_code != HTTPStatus.CREATED:
                self.entity_created = true
                resp.failure()

        self.entity_id = resp.json()['data']['entity_id']

    @seq_task(2)
    def get_entity(self):
        if self.entity_created:
            resp = self.client.get(f'/entities/{self.entity_id}')
            ...
0 голосов
/ 17 апреля 2020

Самый простой способ решить эту проблему - просто использовать один @task с несколькими запросами внутри. Затем, если запрос не удался, просто выполните return после resp.failure ()

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...