В моем коде я использовал канал ZMQ, мой код создаст отдельный процесс для данного идентификатора процесса. Идентификатор процесса будет передан argv [1] через терминал. Теперь каждый процесс будет транслировать свой собственный идентификатор процесса и будет прослушивать все широковещательные сообщения. При прослушивании он обнаружит активный процесс, присутствующий в канале, и после обнаружения нового активного процесса он добавит идентификатор процесса в свой список процессов.
Код Python 3:
import sys
import os
import asyncio
import socket
import zmq
BL_port = 55555
class Broadcast_Listen():
def __init__(self, port, listener, size=24, poll=1000):
self.port = port
self.listener = listener
self.size = size
self.poll = poll
self.socket = socket.socket(
socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(('', port))
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
async def start(self):
while True:
events = dict(self.poller.poll(self.poll))
if self.socket.fileno() in events:
message, socket = self.socket.recvfrom(self.size)
self.listener(message)
await asyncio.sleep(0)
def send(self, message):
self.socket.sendto(message, 0, ('255.255.255.255', self.port))
return
class Test():
def __init__(self, id):
self.bl_started = asyncio.Event()
self.id = id
self.hb_started = asyncio.Event()
self.process_list = []
async def init_broadcast_listen(self):
self.broadcast_listen = Broadcast_Listen(BL_port, self.broadcast_watch)
self.bl_started.set()
print(f'Broadcast listen Started {self.id}')
await self.broadcast_listen.start()
async def heartbeat(self):
await self.bl_started.wait()
while True:
message = f'heartbeat,{self.id}'
self.broadcast_listen.send(bytes(message, 'utf-8'))
print(f'heartbeat broadcasting: {self.id}, {message}')
await asyncio.sleep(5)
def broadcast_watch(self, message):
msg = str(message, 'utf-8')
msg_list = msg.split(',')
mg = msg_list[0]
id = msg_list[1]
print(f'Received Broadcasting Id: {id}, message: {mg}')
if id not in self.process_list:
self.process_list.append(id)
print(self.process_list)
async def start(self):
await asyncio.gather(
self.init_broadcast_listen(),
self.heartbeat(),
)
async def main():
test = Test(sys.argv[1])
await test.start()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print('exiting')
exit()
class Broadcast_Listen()
используется для создания соединения, а также для передачи сообщения с идентификатором через канал и для прослушивания других сообщений.
class Test()
- основной класс, который будет использоваться для создавать процессы для каждого отдельного идентификатора процесса.
Мне нужно добавить специальную функцию в этот код, т. е. если процесс останавливает свою трансляцию (т. е. процесс завершается), то каждый процесс, который прослушивал эту трансляцию, будет чувствовать что процесс больше не активен и удалите идентификатор процесса из списка процессов. Это означает, что я хочу добавить механизм определения процесса сбоя через сердцебиение асинхронно c. В моем коде я не могу добавить метод обнаружения ошибок. Наиболее важно упомянуть, что мой метод вещания сердцебиения выполнен в асинхронном режиме. Пожалуйста, помогите.