Как создать сервер для постоянного потока (он же pubsub) в Golang GRPC - PullRequest
2 голосов
/ 29 октября 2019

Я создаю службу, которая должна отправлять события всем подписавшимся потребителям способом Pub / Sub, например. отправить одно событие на всем подключенным в данный момент клиентам .

Для этого я использую Protobuf со следующим прототипом:

service EventsService {
  rpc ListenForEvents (AgentProcess) returns (stream Event) {}
}

Сервер и клиент написаны на Go.

Моя проблема в том, что когда клиент инициирует соединение, тогда поток не является долгоживущим, например. когда сервер возвращается из метода ListenForEvents:

func (e EventsService) ListenForEvents(process *pb.AgentProcess, listener pb.EventsService_ListenForEventsServer) error {
    //persist listener here so it can be used later when backend needs to send some messages to client

    return nil
}

, тогда клиент почти мгновенно получает ошибку EOF, которая означает, что сервер, вероятно, закрыл соединение.

Что мне делать, чтобы клиентдавно подписан на сервер? Основная проблема заключается в том, что мне может не хватить ничего для отправки клиенту при вызове метода ListenForEvents на сервере, поэтому я хочу, чтобы этот поток был долгоживущим , чтобыиметь возможность отправлять сообщения позже.

Ответы [ 3 ]

0 голосов
/ 29 октября 2019

Необходимо предоставить настройки keepalive для клиента и сервера grpc

0 голосов
/ 29 октября 2019

Мне удалось это зафиксировать.

По сути, я никогда не возвращаюсь из метода ListenForEvents. Он создает канал, сохраняется в глобальной карте подписанных клиентов и продолжает читать с этого канала бесконечно.

Вся реализация логики сервера:

func (e EventsService) ListenForEvents(process *pb.AgentProcess, listener pb.EventsService_ListenForEventsServer) error {
    chans, exists := e.listeners[process.Hostname]

    chanForThisClient := make(chan *pb.Event)

    if !exists {
        e.listeners[process.Hostname] = []chan *pb.Event{chanForThisClient}
    } else {
        e.listeners[process.Hostname] = append(chans, chanForThisClient)
    }

    for {
        select {
        case <-listener.Context().Done():
            return nil
        case res := <-chanForThisClient:
            _ = listener.Send(res)
        }
    }

    return nil
}
0 голосов
/ 29 октября 2019

Поток прекращается, когда вы возвращаетесь из функции сервера. Вместо этого вы должны каким-то образом получать события и отправлять их клиенту, не возвращаясь с вашего сервера. Вероятно, есть много способов сделать это. Ниже приведен эскиз одного из способов сделать это.

Это зависит от соединения с сервером, работающего в отдельной программе. Существует функция Broadcast (), которая отправляет сообщения всем подключенным клиентам. Это выглядит так:

var allRegisteredClients map[*pb.AgentProcess]chan Message
var clientsLock sync.RWMutex{}

func Broadcast(msg Message) {
  clientsLock.RLock()
  for _,x:=range allRegisteredClients {
      x<-msg
  }
  clientsLock.RUnlock()
}

Затем ваши клиенты должны зарегистрироваться и обработать сообщения:

func (e EventsService) ListenForEvents(process *pb.AgentProcess, listener pb.EventsService_ListenForEventsServer) error {
   clientsLock.Lock()
   ch:=make(chan Message)
   allRegisteredClients[process]=ch
   clientsLock.Unlock()

   for msg:=range ch {
       // send message
       // Deal with errors
       // Deal with client terminations
   }
   clientsLock.Lock()
   delete(allRegisteredClients,process)
   clientsLock.Unlock()
}

Как я уже сказал, это всего лишь набросок идеи.

...