Я хочу автоматизировать тестирование прибора и написал небольшую серверную программу для имитации прибора, который будет отправлять команду обратно, кроме случаев, когда он получает специальную команду "* IDN?". Когда я запускал эхо-сервер напрямую в его собственном сценарии, а затем запускал клиентский сценарий отдельно, все отлично работает, и я получаю ожидаемые результаты. Теперь я хотел запустить сервер прямо из скрипта тестирования. Поэтому я подумал, что начну с многопроцессорной обработки. Но проблема, похоже, заключается в том, что сокет сервера попадает в строку s.accept (), которая просто ждет и никогда не возвращается. Итак, как мне выполнить sh автоматическое тестирование, если я не могу запустить этот сервер в том же коде, что и функция тестирования?
import socket
import multiprocessing as mp
import time,sys
HOST = '127.0.0.1' # Standard loopback interface address (localhost),
PORT = 65432 # Port to listen on (non-privileged ports are > 1023),
FTP_PORT = 63217 # Port for ftp testing, change to 21 for device
def handle_connection(conn,addr):
with conn:
conn.send('Connected by', addr)
print("Got connection")
data = conn.recv(1024)
if not data:
return 'Nodata'
elif (data == b'*IDN?\n'):
print('SONY/TEK,AWG520,0,SCPI:95.0 OS:3.0 USR:4.0\n')
conn.sendall(b'SONY/TEK,AWG520,0,SCPI:95.0 OS:3.0 USR:4.0\n')
return 'IDN'
else:
conn.sendall(data)
return 'Data'
def echo_server(c_conn,host=HOST,port=PORT):
# this server simulates the AWG command protocol, simply echoing the command back except for IDN?
p = mp.current_process()
print('Starting echo server:', p.name, p.pid)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((host, port))
s.listen()
try:
while True:
print("Waiting for connection...")
c_conn.send('waiting for connection...')
conn, addr = s.accept()
handle_connection(conn,addr)
c_conn.send('serving client...')
finally:
conn.close()
c_conn.send('done')
time.sleep(2)
print('Exiting echo server:', p.name, p.pid)
sys.stdout.flush()
def test_echo_server():
print("entering client part")
with socket.socket(socket.AF_INET,socket.SOCK_STREAM) as mysock:
mysock.connect((HOST,PORT))
mysock.sendall('test\n'.encode())
data = mysock.recv(1024)
print('received:',repr(data))
if __name__ == '__main__':
parent_conn, child_conn = mp.Pipe()
echo_demon = mp.Process(name='echo', target=echo_server(child_conn, ))
echo_demon.daemon = True
echo_demon.start()
time.sleep(1)
echo_demon.join(1)
test_echo_server()
if parent_conn.poll(1):
print(parent_conn.recv())
else:
print('Waiting for echo server')