Как запустить две функции одновременно - PullRequest
15 голосов
/ 21 января 2010

Я запускаю тестирование, но хочу запустить 2 функции одновременно. У меня есть камера, и я говорю ей, чтобы она двигалась с помощью suds, затем я вхожу в камеру через SSH, чтобы проверить скорость, на которой установлена ​​камера. Когда я проверяю скорость, камера остановилась, поэтому скорость недоступна. Есть ли способ заставить эти функции работать одновременно, чтобы проверить скорость камеры. Пример кода ниже:

class VerifyPan(TestAbsoluteMove):

    def runTest(self):

        self.dest.PanTilt._x=350

        # Runs soap move command
        threading.Thread(target = SudsMove).start()

        self.command = './ptzpanposition -c 0 -u degx10'

        # Logs into camera and checks speed
        TestAbsoluteMove.Ssh(self)

        # Position of the camera verified through Ssh (No decimal point added to the Ssh value)
        self.assertEqual(self.Value, '3500')

Я попробовал модуль потоков, как указано ниже. Поток не работает синхронно с функцией TestAbsoluteMove.Ssh (). Есть ли другой код, который мне нужен, чтобы эта работа.

Я смотрел на помещение аргументов в оператор потока, которые указывают, что поток запускается при выполнении функции Ssh (). Кто-нибудь знает, что вводить в это утверждение?

Извините, если я не объяснил правильно. Функция «SudsMove» перемещает камеру, а функция «Ssh» входит в камеру и проверяет скорость, с которой камера движется в данный момент. Проблема в том, что к тому времени, когда функция 'Ssh' регистрирует в камере функцию, перестает работать. Мне нужно, чтобы обе функции работали параллельно, чтобы я мог проверить скорость камеры, пока она еще движется.

Спасибо

Ответы [ 4 ]

12 голосов
/ 21 января 2010

Импортируйте модуль threading и запустите SudsMove() примерно так:

threading.Thread(target = SudsMove).start()

Это создаст и запустит фоновую нить, которая делает движение.

ОТВЕТ НА РЕДАКТИРОВАНИЕ ВОПРОСА:

Насколько я понимаю, TestAbsoluteMove.Ssh(self) опрашивает скорость один раз и сохраняет результат в self.Value ?! И вы тестируете ожидаемый конечный наклон / поворот / положение с помощью self.assertEqual(self.Value, '3500')?!

Если это правильно, вам следует подождать, пока камера начнет движение. Вероятно, вы могли бы опросить скорость в определенном интервале:

# Move camera in background thread
threading.Thread(target = SudsMove).start()

# What does this do?
self.command = './ptzpanposition -c 0 -u degx10'

# Poll the current speed in an interval of 250 ms
import time
measuredSpeedsList = []

for i in xrange(20):
    # Assuming that this call will put the result in self.Value
    TestAbsoluteMove.Ssh(self)
    measuredSpeedsList.append(self.Value)
    time.sleep(0.25)

print "Measured movement speeds: ", measuredSpeedsList

Скорость движения будет наибольшим значением в measuredSpeedsList (т.е. max(measuredSpeedsList)). Надеюсь, что это имеет смысл ...

4 голосов
/ 25 января 2010

Если вы хотите использовать обычную реализацию Python (CPython), вы, безусловно, можете использовать модуль multiprocessing , который творит чудеса (вы можете передавать непроходимые аргументы подпроцессам Удаляет задачи ...), предлагает интерфейс, аналогичный интерфейсу потоков, и не страдает от Глобальной блокировки интерпретатора.

Недостатком является то, что создаются подпроцессы, что занимает больше времени, чем создание потоков; это должно быть проблемой, только если у вас много коротких задач. В ситуациях, когда каждая задача занимает «много времени», модуль многопроцессорный должен быть великолепным.

2 голосов
/ 25 января 2010

Одновременно может быть запущен только один поток. На этот вопрос подробно ответили здесь . Одним из решений будет использование двух отдельных процессов. Приведенный выше ответ дает несколько советов.

1 голос
/ 25 января 2010

Если вы можете заставить свой код работать под Jython или IronPython, то вы можете запускать несколько потоков одновременно; у них нет такой глупой «глобальной блокировки интерпретатора» в CPython.

...