Python: как отправлять пакеты в многопоточном режиме, а затем поток уничтожает себя - PullRequest
5 голосов
/ 03 марта 2009

У меня есть вопрос. Я хотел бы посылать непрерывные потоки байтов на некоторый хост на определенное количество времени (скажем, 1 минута), используя python.

Вот мой код:

#! /usr/bin/env python                                                          

import socket
import thread
import time

IP = "192.168.0.2"
PADDING = "a" * 1000 #assume the MTU is slighly above 1000
DATA = PADDING + "this is sentence number = "
PORT = 14444
killed = False
test_time = 60 #60 seconds of testing

def send_data():
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.connect((IP, PORT))
  count = 1
  starttime = time.clock()
  while elapsed < test_time:
    sent = s.send(DATA + str(count) + "\n")
    if sent == 0: break # assume that if nothing is sent -> connection died
    count = count+1
    elapsed = time.clock() - starttime
    if killed:
      break
  s.close()
  print str(count) + " has been sent"

print "to quit type quit"
thread.start_new_thread(send_data, ())

while True:
  var = raw_input("Enter something: ")
  if var == "quit":
    killed = True

Несколько вопросов, есть ли лучший способ позволить потоку умереть через 60 секунд, кроме опроса time.clock каждый раз? Когда я запускаю эту программу, она отправляет байты правильно, но когда я набрал quit, другой поток не умрет, даже если я установил var kill = True. Интересно, почему это? сфера действия var Killed должна доходить до другого потока, верно?

Спасибо

Ответы [ 7 ]

5 голосов
/ 03 марта 2009

Я рекомендовал использовать модуль потоков. Еще большим преимуществом является использование InterruptableThread для завершения потока. Вам не нужно использовать флаг для завершения вашего потока, но исключение произойдет, если вы вызовите terminate () в этом потоке из parent. Вы можете обрабатывать исключения или нет.

import threading, ctypes

class InterruptableThread(threading.Thread):
@classmethod
def _async_raise(cls, tid, excobj):
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj))
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
        raise SystemError("PyThreadState_SetAsyncExc failed")

def raise_exc(self, excobj):
    assert self.isAlive(), "thread must be started"
    for tid, tobj in threading._active.items():
        if tobj is self:
            self._async_raise(tid, excobj)
            return

def terminate(self):
    self.raise_exc(SystemExit)

EDIT: Вы можете переписать свой код таким образом, используя другой поток, который ждет 1 минуту, а затем убивает другой поток

def send_data:
    IP = ...
    # other vars

    ...
    s = socket.socket(.....)

    # no killed checking
    # no time checking
    # just do your work here
    ...
    s.close()


my_thread = InterruptableThread(target=send_data)
my_thread.start()

def one_minute_kill(who):
   time.sleep(60)
   who.terminate()

killer_thread = InterruptableThread(target=one_minute_kill, args=[my_thread])
killer.start()

print "to quit type quit"
while my_thread.isAlive():
  if raw_input("Enter something: ") == "quit":
    my_thread.terminate()
2 голосов
/ 03 марта 2009

Я не знаю, как это сделать с модулем "Thread", но я могу сделать это с модулем "Threading". Я думаю, что этот код выполняет то, что вы хотите.

Для документации по резьбовому модулю: http://docs.python.org/library/threading.html

#!/usr/bin/python

import time
from threading import Thread
import threading
import sys

test_time = 10
killed = False

class SillyThread( threading.Thread ):
    def run(self):
        global killed
        starttime = time.time()
        counter = 0
        while (time.time() - starttime) < test_time:
            if killed:
                break
            counter = counter + 1
            time.sleep(0.1)
        print "I did %d loops" % counter

class ManageThread( threading.Thread ):
    def run(self):
        global killed
        while True:
            var = raw_input("Enter something: ")
            if var == "quit":
                killed = True
                break
        print "Got var [%s]" % var

silly = SillyThread()
silly.start()
ManageThread().start()
Thread.join(silly)
print "bye bye"
sys.exit(0)

Обратите внимание, что я использую time.time () вместо time.clock (). time.clock () показывает истекшее время процессора в Unix (см. http://docs.python.org/library/time.html). Я думаю, time.clock () должен работать везде. Я установил свой test_time на 10 секунд, потому что у меня нет терпения ни минуты. 1009 *

Вот что произойдет, если я позволю ему работать полные 10 секунд:

leif@peacock:~/tmp$ ./test.py
Enter something: I did 100 loops
bye bye

Вот что произойдет, если я наберу 'quit':

leif@peacock:~/tmp$ ./test.py
Enter something: quit
Got var [quit]
I did 10 loops
bye bye

Надеюсь, это поможет.

1 голос
/ 14 ноября 2009

Вы можете сделать это довольно легко без потоков. Например, используя Twisted, вы просто настраиваете синхронизированный вызов и продюсера:

from twisted.internet.protocol import ClientFactory, Protocol
from twisted.internet import reactor

class Noisy(Protocol):
    def __init__(self, delay, data):
        self.delay = delay
        self.data = data

    def stop(self):
        self.transport.unregisterProducer()
        self.transport.loseConnection()
        reactor.stop()

    def resumeProducing(self):
        self.transport.write(self.data)

    def connectionMade(self):
        self.transport.registerProducer(self, False)
        reactor.callLater(self.delay, self.stop)

factory = ClientFactory()
factory.protocol = lambda: Noisy(60, "hello server")
reactor.connectTCP(host, port, factory)
reactor.run()

Это имеет различные преимущества перед многопоточным подходом. Он не полагается на потоки демона, поэтому вы можете на самом деле очистить сетевое соединение (например, чтобы отправить закрытое сообщение при необходимости), а не полагаться на платформу для его уничтожения. Он обрабатывает весь фактический низкоуровневый сетевой код для вас (ваш оригинальный пример неправильно работает в случае, когда socket.send возвращает 0; этот код будет обрабатывать этот случай правильно). Вам также не нужно полагаться на ctypes или непонятный API-интерфейс CPython для вызова исключения в другом потоке (так что он переносим на большее количество версий Python и может фактически прервать заблокированную передачу, в отличие от некоторых других предлагаемых подходов). 1004 *

1 голос
/ 03 марта 2009

Как упоминалось выше, используйте модуль threading , он намного проще в использовании и предоставляет несколько примитивов синхронизации. Он также предоставляет класс Timer , который запускается через указанное количество времени.

Если вы просто хотите, чтобы программа выходила, вы можете просто сделать отправляющий поток демоном. Вы делаете это, вызывая setDaemon (True) перед вызовом start () (вместо 2.6 может использоваться атрибут daemon). Python не выйдет, пока не запущен поток, не являющийся демоном.

0 голосов
/ 04 марта 2009

Легко проверить область действия killed:

>>> import thread
>>> killed = False
>>> import time
>>> def test():
...  while True:
...   time.sleep(1)
...   if killed:
...     print 'Dead.'
...     break
... 
>>> thread.start_new_thread(test, ())
25479680
>>> time.sleep(3)
>>> killed = True
>>> Dead.
0 голосов
/ 03 марта 2009

Переменная не инициализирована. Установите его на ноль выше цикла while.

0 голосов
/ 03 марта 2009

Убедитесь, что «выход» работает правильно, и добавьте мелкий шрифт, чтобы проверить работоспособность ввода.

if var == "quit":
 print "Hey we got quit"
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...