Я реализую алгоритм логических часов Lamports в Python. Поэтому мне пришлось создать три процесса {process1.py, process2.py, process3.py} на одной машине и запускать их параллельно в одно и то же время, чтобы каждый обрабатывать многоадресные события для всех других процессов. Я использую сценарий оболочки для параллельного запуска этих файлов Python. Я использую сокеты для установления связи между ними, используя разные порты. Код всех трех процессов остается тем же, за исключением того, что они прослушивают сообщения на разных портах.
Поскольку все процессы одинаковы и инициируют связь одновременно, я получаю сообщение об ошибке «Отказано в соединении». Я знаю, что это может произойти, когда сервер не работает в случае связи клиент-сервер. Но в моем случае все компоненты являются серверами. Любая помощь с благодарностью.
process1.py
import socket
import math
import time
import threading
class process1:
# send message to p2 and p3.
def send_message(self):
buffer = [1.1]
ack_buf = []
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(True)
sock.bind(('localhost',6000))
sock.connect(('localhost',6001))
message1 = {'payload' : 1.1}
sock.send(str(message1).encode('utf8'))
data = sock.recv(1024)
var = eval(data.decode('utf8'))
if 'payload' in var:
buffer.append(var['payload'])
threading.Thread(target=self.send_ack, args = (var['payload'], sock)).start()
elif 'ack' in var:
ack_buf.append(var['ack'])
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('localhost', 6002))
s.send(str(message1).encode('utf8'))
data_recv = sock.recv(1024)
var = eval(data_recv.decode('utf8'))
if 'payload' in var:
buffer.append(var['payload'])
threading.Thread(target=self.send_ack, args = (var['payload'], s)).start()
elif 'ack' in var:
ack_buf.append(var['ack'])
self.print_events(buffer)
self.validate(buffer, ack_buf)
# sends acknowlegment to all the processes in the distributed system.
def send_ack(self,event, socket):
if event>0 and socket is not None:
socket.send(str({'ack':event}).encode('utf8'))
# prints the events in the sorted order
def print_events(self,buffer):
sorted(buffer)
for x in range(len(buffer)):
m = buffer[x]
frac, whole = math.modf(m)
each_event = int(frac*10)
print('Event %d of process %d is happening according to the received order'%(each_event, whole))
# validates if the msgs sent are received by the processes
def validate(self, list_of_events, recv_buf):
count = 0
if len(list_of_events) > 0 and len(recv_buf) > 0:
for x in range(len(list_of_events)):
if list_of_events[x] in recv_buf:
count += 1
if count == 2:
print("Event %d is sent and acknowledged by the other processes"%(list_of_events[x]))
if __name__ == "__main__":
p1 = process1()
time.sleep(.300)
threading.Thread(target=p1.send_message).start()
process2.py
import socket
import math
import time
import threading
class process2:
# send message to p2 and p3.
def send_message(self):
buffer = [1.1]
ack_buf = []
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(True)
sock.bind(('localhost',6001))
time.sleep(.300)
sock.connect(('localhost',6000))
message1 = {'payload' : 2.1}
sock.send(str(message1).encode('utf8'))
data = sock.recv(1024)
var = eval(data.decode('utf8'))
if 'payload' in var:
buffer.append(var['payload'])
threading.Thread(target=self.send_ack, args = (var['payload'], sock)).start()
elif 'ack' in var:
ack_buf.append(var['ack'])
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
time.sleep(.300)
s.connect(('localhost', 6002))
s.send(str(message1).encode('utf8'))
data_recv = sock.recv(1024)
var = eval(data_recv.decode('utf8'))
if 'payload' in var:
buffer.append(var['payload'])
threading.Thread(target=self.send_ack, args = (var['payload'], s)).start()
elif 'ack' in var:
ack_buf.append(var['ack'])
self.print_events(buffer)
self.validate(buffer, ack_buf)
# sends acknowlegment to all the processes in the distributed system.
def send_ack(self, event, socket):
if event>0 and socket is not None:
socket.send(str({'ack':event}).encode('utf8'))
# prints the events in the sorted order
def print_events(self, buffer):
sorted(buffer)
for x in range(len(buffer)):
m = buffer[x]
frac, whole = math.modf(m)
each_event = int(frac*10)
print('Event %d of process %d is happening according to the received order'%(each_event, whole))
# validates if the msgs sent are received by the processes
def validate(self, list_of_events, recv_buf):
count = 0
if len(list_of_events) > 0 and len(recv_buf) > 0:
for x in range(len(list_of_events)):
if list_of_events[x] in recv_buf:
count += 1
if count == 2:
print("Event %d is sent and acknowledged by the other processes"%(list_of_events[x]))
if __name__ == "__main__":
p2 = process2()
time.sleep(.300)
threading.Thread(target=p2.send_message).start()
process3.py
import socket
import math
import time
import threading
class process3:
# send message to p2 and p3.
def send_message(self):
buffer = [1.1]
ack_buf = []
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(True)
sock.bind(('localhost',6002))
sock.connect(('localhost',6000))
message1 = {'payload' : 3.1}
sock.send(str(message1).encode('utf8'))
data = sock.recv(1024)
var = eval(data.decode('utf8'))
if 'payload' in var:
buffer.append(var['payload'])
threading.Thread(target=self.send_ack, args = (var['payload'], sock)).start()
elif var.has_key('ack'):
ack_buf.append(var['ack'])
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
time.sleep(.300)
s.connect(('localhost', 6001))
s.send(str(message1).encode('utf8'))
data_recv = sock.recv(1024)
var = eval(data_recv.decode('utf8'))
if 'payload' in var:
buffer.append(var['payload'])
threading.Thread(target=self.send_ack, args = (var['payload'], s)).start()
elif 'ack' in var:
ack_buf.append(var['ack'])
self.print_events(buffer)
self.validate(buffer, ack_buf)
# sends acknowlegment to all the processes in the distributed system.
def send_ack(self, event, socket):
if event>0 and socket is not None:
socket.send(str({'ack':event}).encode('utf8'))
# prints the events in the sorted order
def print_events(self, buffer):
sorted(buffer)
for x in range(len(buffer)):
m = buffer[x]
frac, whole = math.modf(m)
each_event = int(frac*10)
print('Event %d of process %d is happening according to the received order'%(each_event, whole))
# validates if the msgs sent are received by the processes
def validate(self, list_of_events, recv_buf):
count = 0
if len(list_of_events) > 0 and len(recv_buf) > 0:
for x in range(len(list_of_events)):
if list_of_events[x] in recv_buf:
count += 1
if count == 2:
print("Event %d is sent and acknowledged by the other processes"%(list_of_events[x]))
if __name__ == "__main__":
p3 = process3()
time.sleep(.300)
threading.Thread(target=p3.send_message).start()
job.sh
python3 process1.py &
python3 process2.py &
python3 process3.py &