Я пишу приложение на основе Python, для которого я использую программирование сокетов.
Я придерживаюсь следующего подхода.
Создан 1 сервер TCP / IP, 1 поток клиента TCP / IP контроллера и 3 потока клиентов TCP / IP.
Я хочу, чтобы приложение работало так. Всякий раз, когда контроллер отправляет сообщение, он транслируется всем 3 клиентам TCP / IP. После получения сообщения от контроллера клиентские потоки выполняют некоторую задачу и отправляют данные на сервер.
Теперь сервер должен отправить эти данные в поток контроллера.
Коммуникационная часть клиентов и контроллера работает нормально.
Единственная проблема, с которой я сталкиваюсь, заключается в том, что сервер помещает все данные, полученные от клиентов, в сокет контроллера.
Я хочу, чтобы сервер поместил данные 1 клиентского потока в сокет контроллера, дождитесь, пока эти данные будут получены. Затем поместите данные следующего потока.
Пока я использую SOCK_STREAM для сокетов.
Библиотека: -
#!/usr/bin/python
import select, socket, sys, Queue, errno
usable_port_start = 40000
Internal_ip = "127.0.0.1"
class getTCPports(object):
def __init__(self,starting_port=usable_port_start,address=Internal_ip):
super(getTCPports, self).__init__()
self.IP_address = address
i = 1
delta = 0
while i <= 1:
delta += 2
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
try_port=starting_port + delta
s.bind((self.IP_address,try_port))
self.free_port=try_port
i+=1
except socket.error as e:
if e.errno == errno.EADDRINUSE:
print("Port" , try_port , "is already in use")
else:
# something else raised the socket.error exception
print(e)
s.close()
class IPCLib(getTCPports):
server_port = 0
controller_port = 0
client_map = {}
def __init__(self):
super(IPCLib,self).__init__()
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind((self.IP_address, self.free_port))
self.inputs = [self.socket]
self.is_alive = True
self.broadcast_list = []
@classmethod
def modify_server_port(cls,port):
cls.server_port = port
@classmethod
def modify_client_port(cls,identity,port):
if 0 <= identity.find("CONTROLLER"):
cls.controller_port = port
elif 0 <= identity.find("CLIENT"):
cls.client_map[identity] = port
def start_TCP_server(self):
self.socket.setblocking(0)
self.socket.listen(10)
self.modify_server_port(self.free_port)
while self.is_alive:
inputready,outputready,exceptready = select.select(self.inputs,[],[])
for s in inputready: #check each socket that select() said has available data
if s == self.socket: #if select returns our server socket, there is a new
#remote socket trying to connect
client, address = s.accept()
self.inputs.append(client) #add it to the socket list so we can check it now
self.broadcast_list.append(client)
#print 'new client added%s'%str(address)
else:
# select has indicated that these sockets have data available to recv
data = s.recv(4096)
if data:
#print '%s Received From Client(on server)-> %s'%(data,s.getpeername()[1])
#Uncomment below to echo the recv'd data back
#to the sender... loopback!
if s.getpeername()[1]==self.controller_port:
self.broadcast(data)
else: #if sender is monitoring clients, send data to only controller
self.send_to_controller(data)
else:#if recv() returned NULL, that usually means the sender wants
#to close the socket.
s.close()
self.inputs.remove(s)
#if running is ever set to zero, we will call this
server.close()
def start_TCP_client(self,identity):
self.modify_client_port(identity,self.free_port)
self.socket.connect((self.IP_address,self.server_port))
def stop_TCP_client(self):
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
def broadcast(self,message):
for client in self.broadcast_list:
if client.getpeername()[1]!=self.controller_port:
try:
client.send(message)
except:
client.close()
# if the link is broken, we remove the client
remove(clients)
def send_to_controller(self,message):
for client in self.broadcast_list:
if client.getpeername()[1]==self.controller_port:
try:
client.send(message)
except:
client.close()
# if the link is broken, we remove the client
remove(clients)
def send_data(self,data):
self.socket.send(data)
def receive_data(self):
message = self.socket.recv(4096)
return message
Программа водителя: -
#!/usr/bin/python
from IPCLib import *
import threading
import os
import time
def run_server():
i1=IPCLib()
print("Task assigned to thread: {}".format(threading.current_thread().name))
print("ID of process running task: {}".format(os.getpid()))
i1.start_TCP_server()
def run_controller(identity):
i1=IPCLib()
print("Task assigned to thread: {}".format(threading.current_thread().name))
print("ID of process running task: {}".format(os.getpid()))
i1.start_TCP_client(identity)
print("server port " , i1.server_port)
print("controller port", i1.controller_port)
print("bsc info", i1.client_map)
time.sleep(1)
while i1.is_alive:
i1.send_data("hello")
print"Next Clock"
sender_map={}
sender_list = []
sender_list = i1.client_map.values()
for sender in sender_list:
sender_map[sender] = False
i=1
#print any(sender_map.values())
while any(value == False for value in sender_map.values()):
print("Loop Iteration %s"%i)
data = i1.receive_data()
temp = data.split(",")
port = temp.pop(0)
sender_map[int(port)] = True
data = ",".join(temp)
print("Data %s received from port %s"%(data,port))
print sender_map
i+=1
print sender_list
time.sleep(1)
i1.stop_TCP_client()
def run_monitors(identity):
i1=IPCLib()
print("Task assigned to thread: {}".format(threading.current_thread().name))
print("ID of process running task: {}".format(os.getpid()))
i1.start_TCP_client(identity)
print("server port " , i1.server_port)
print("controller port", i1.controller_port)
print("bsc info", i1.client_map)
while i1.is_alive:
if i1.receive_data():
output = "%d"%i1.free_port
output = output + "," + "Hello"
i1.send_data(output)
i1.stop_TCP_client()
# creating thread
t1 = threading.Thread(target=run_server, name='server')
t3 = threading.Thread(target=run_monitors, name='Client1',args=("CLIENT-1",))
t4 = threading.Thread(target=run_monitors, name='Client2',args=("CLIENT-2",))
t5 = threading.Thread(target=run_monitors, name='Client3',args=("CLIENT-3",))
t6 = threading.Thread(target=run_monitors, name='Clinet4',args=("CLIENT-4",))
#make threads deamons
t1.daemon = True
t3.daemon = True
t4.daemon = True
t5.daemon = True
t6.daemon = True
# starting threads
try:
t1.start()
time.sleep(0.1)
t3.start()
time.sleep(0.1)
t4.start()
time.sleep(0.1)
t5.start()
time.sleep(0.1)
t6.start()
time.sleep(0.1)
run_controller("CONTROLLER")
except KeyboardInterrupt:
t1.is_alive = False
t3.is_alive = False
t4.is_alive = False
t5.is_alive = False
t6.is_alive = False
Как заставить сервер ждать, пока в сокете уже есть данные?