вернуть контроль на транспорт - PullRequest
0 голосов
/ 20 ноября 2018

Я пытаюсь смоделировать ситуацию, когда данные периодически принимаются сервером.В моей установке я запускаю один процесс, который устанавливает сервер, и другой процесс, который устанавливает группу клиентов (достаточно подумать об одном клиенте).Я настроил часть кода, собирая кусочки в основном из здесь .Сервер / клиенты связываются, отправляя сообщения с использованием transport.write.Во-первых, сервер сообщает клиентам начать (это отлично работает AFAIK).Клиенты отчитываются на сервере, как они прогрессируют.Что меня смутило, так это то, что я получаю эти прерывистые сообщения только в самом конце, когда клиент готов.Это может быть проблемой с очисткой буфера, и я попробовал (безуспешно) такие вещи, как This .Кроме того, каждое сообщение довольно большое, и я пытался отправить одно и то же сообщение несколько раз, чтобы очистить буфер.

Я подозреваю, что я вижу проблему с возвратом элемента управления в транспорт , но я не могу понять, как с этим покончить.

Любая помощь по этому или любым другим вопросам, которые возникают у вас, очень ценится.

Сервер:

from twisted.internet import reactor, protocol

import time
import serverSideAnalysis
import pdb
#import bson, json, msgpack
import _pickle as pickle  # I expect the users to authenticate and not 
                          # do anything malicious. 


PORT = 9000
NUM = 1
local_scratch="/local/scratch"


class Hub(protocol.Protocol):
  def __init__(self,factory, clients, nclients):
    self.clients = clients 
    self.nclients = nclients
    self.factory = factory
    self.dispatcher = serverSideAnalysis.ServerTalker(NUM, self, 
          local_scratch)

  def connectionMade(self):
    print("connected to user" , (self))
    if len(self.clients) < self.nclients:
      self.factory.clients.append(self)
    else:
      self.factory.clients[self.nclients] = self
    if len(self.clients) == NUM:
      val = input("Looks like everyone is here, shall we start? (Y/N)")
      while (val.upper() != "Y"):
        time.sleep(20)
        val = input("Looks like everyone is here, shall we start??? (Y/N)")
      message = pickle.dumps({"TASK": "INIT", "SUBTASK":"STORE"})
      self.message(message) # This reaches the client as I had expected

  def message(self, command):
    for c in self.factory.clients:
      c.transport.write(command)

  def connectionLost(self, reason):
    self.factory.clients.remove(self)
    self.nclients -= 1

  def dataReceived(self, data):
    if len(self.clients) == NUM:
      self.dispatcher.dispatch(data)

class PauseTransport(protocol.Protocol):
  def makeConnection(self, transport):
    transport.pauseProducing()

class HubFactory(protocol.Factory):
  def __init__(self, num):
    self.clients = set([])
    self.nclients = 0 
    self.totConnections = num

  def buildProtocol(self, addr):
    print(self.nclients)
    if self.nclients < self.totConnections:
      self.nclients += 1
      return Hub(self, self.clients, self.nclients)
    protocol = PauseTransport()
    protocol.factory = self
    return protocol

factory = HubFactory(NUM)
reactor.listenTCP(PORT, factory)
factory.clients = []
reactor.run()

Клиент:

from twisted.internet import reactor, protocol
import time
import clientSideAnalysis
import sys


HOST = 'localhost'
PORT = 9000
local_scratch="/local/scratch"

class MyClient(protocol.Protocol):

  def connectionMade(self):
    print("connected!")
    self.factory.clients.append(self)
    print ("clients are ", self.factory.clients)

    self.cdispatcher = clientSideAnalysis.ServerTalker(analysis_file_name, local_scratch, self)

  def clientConnectionLost(self, reason):
    #TODO send warning
    self.factory.clients.remove(self)

  def dataReceived(self, data): #This is the problematic part I think
    self.cdispatcher.dispatch(data)
    print("1 sent")
    time.sleep(10)
    self.cdispatcher.dispatch(data)
    print("2 sent")
    time.sleep(10)
    self.cdispatcher.dispatch(data)
    time.sleep(10)


  def message(self, data):
    self.transport.write(data)

class MyClientFactory(protocol.ClientFactory):
  protocol = MyClient

if __name__=="__main__":
  analysis_file_name = sys.argv[1]

  factory = MyClientFactory()
  reactor.connectTCP(HOST, PORT, factory)
  factory.clients = []
  reactor.run()

Последний бит соответствующей информации о том, что делают диспетчеры.

В обоих случаях они загружают поступившее сообщение (словарь) и выполняют несколько вычислений на основе содержимого.Время от времени они используют метод message для связи с текущими значениями.

Наконец, я использую Python 3.6.и скрученный 18.9.0

1 Ответ

0 голосов
/ 20 ноября 2018

Способ, которым вы возвращаете управление реактору из метода Protocol.dataReceived, заключается в том, что вы возвращаетесь из этого метода.Например:

def dataReceived(self, data):
    self.cdispatcher.dispatch(data)
    print("1 sent")

Если вы хотите, чтобы больше работы происходило после , у вас есть несколько вариантов.Если вы хотите, чтобы работа выполнялась через некоторое время, используйте reactor.callLater.Если вы хотите, чтобы работа выполнялась после ее отправки в другой поток, используйте twisted.internet.threads.deferToThread.Если вы хотите, чтобы работа выполнялась в ответ на какое-то другое событие (например, полученные данные), поместите его в обратный вызов, который обрабатывает это событие (например, dataReceived).

...