Как обнаружить сбой узла, используя сердцебиение в python? - PullRequest
0 голосов
/ 30 апреля 2020

В моем коде я использовал канал ZMQ, мой код создаст отдельный процесс для данного идентификатора процесса. Идентификатор процесса будет передан argv [1] через терминал. Теперь каждый процесс будет транслировать свой собственный идентификатор процесса и будет прослушивать все широковещательные сообщения. При прослушивании он обнаружит активный процесс, присутствующий в канале, и после обнаружения нового активного процесса он добавит идентификатор процесса в свой список процессов.

Код Python 3:

import sys
import os
import asyncio
import socket
import zmq

BL_port = 55555

class Broadcast_Listen():
    def __init__(self, port, listener, size=24, poll=1000):
        self.port = port
        self.listener = listener
        self.size = size
        self.poll = poll

        self.socket = socket.socket(
            socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(('', port))

        self.poller = zmq.Poller()
        self.poller.register(self.socket, zmq.POLLIN)

    async def start(self):
        while True:
            events = dict(self.poller.poll(self.poll))
            if self.socket.fileno() in events:
                message, socket = self.socket.recvfrom(self.size)
                self.listener(message)
            await asyncio.sleep(0)

    def send(self, message):
        self.socket.sendto(message, 0, ('255.255.255.255', self.port))
        return

class Test():
    def __init__(self, id):
        self.bl_started = asyncio.Event()
        self.id = id
        self.hb_started = asyncio.Event()
        self.process_list = []

    async def init_broadcast_listen(self):
        self.broadcast_listen = Broadcast_Listen(BL_port, self.broadcast_watch)
        self.bl_started.set()
        print(f'Broadcast listen Started {self.id}')
        await self.broadcast_listen.start()

    async def heartbeat(self):
        await self.bl_started.wait()
        while True:
            message = f'heartbeat,{self.id}'
            self.broadcast_listen.send(bytes(message, 'utf-8'))
            print(f'heartbeat broadcasting: {self.id}, {message}')
            await asyncio.sleep(5)

    def broadcast_watch(self, message):
        msg = str(message, 'utf-8')
        msg_list = msg.split(',')
        mg = msg_list[0]
        id = msg_list[1]
        print(f'Received Broadcasting Id: {id}, message: {mg}')
        if id not in self.process_list:
            self.process_list.append(id)
            print(self.process_list)

    async def start(self):
        await asyncio.gather(
            self.init_broadcast_listen(),
            self.heartbeat(),
        )

async def main():
    test = Test(sys.argv[1])
    await test.start()

if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print('exiting')
        exit()

class Broadcast_Listen() используется для создания соединения, а также для передачи сообщения с идентификатором через канал и для прослушивания других сообщений.

class Test() - основной класс, который будет использоваться для создавать процессы для каждого отдельного идентификатора процесса.

Мне нужно добавить специальную функцию в этот код, т. е. если процесс останавливает свою трансляцию (т. е. процесс завершается), то каждый процесс, который прослушивал эту трансляцию, будет чувствовать что процесс больше не активен и удалите идентификатор процесса из списка процессов. Это означает, что я хочу добавить механизм определения процесса сбоя через сердцебиение асинхронно c. В моем коде я не могу добавить метод обнаружения ошибок. Наиболее важно упомянуть, что мой метод вещания сердцебиения выполнен в асинхронном режиме. Пожалуйста, помогите.

...