Использовать очереди Redis в обоих направлениях. Конечная точка API записывает запрос и уникальный идентификатор в очередь, регистрирует канал Go с уникальным идентификатором в качестве ключа с центральным считывателем в процессе и ожидает на канале Go.
Средство чтения очереди получает ответы с идентификатором из очереди Redis и отправляет ответ на соответствующий канал Go.
// Response represents the response type from the
// remote service.
type Response struct {
ID string
// ... more fields as needed
}
// Request represents request to remote serivce.
type Request struct {
ID string
// ... more fields as needed
}
// enqueueRequest writes request to Redis queue.
// Implementation TBD by application.
func enqueueRequest(r *Request) error {
return nil
}
// dequeueResponse reads a response from a Redis queue.
// Implementation TBD by application.
func dequeueResponse() (*Response, error) {
return nil, nil
}
Используйте syn c .Map для регистрации ожидающих Go каналов от обработчиков запросов API. Ключ является уникальным идентификатором для запроса, а значением является chan *Response
.
var responseWaiters sync.Map
Запустить queuePump в одной процедуре для удаления результатов из очереди Redis и отправки на соответствующий канал. Запустите gorountine перед обслуживанием HTTP-запросов.
func queuePump() {
for {
response, err := dequeueResponse()
if err != nil {
// handle error
}
v, ok := responseWaiters.Load(response.ID)
if ok {
c := v.(chan *Response)
c <- response
// Remove cahnel from map to ensure that pump never sends
// twice to same channel. The pump will black forever if
// this happens.
responseWaiters.Delete(response.ID)
}
}
}
Конечная точка API выделяет уникальный запрос для запроса, регистрирует канал с помощью очереди очереди, ставит в очередь запрос и ожидает ответа.
func apiEndpoint(w http.ResponseWriter, r *http.Request) {
id := generateUniqueID()
c := make(chan *Response, 1) // capacity 1 ensures that queue pump will not block
responseWaiters.Store(id, c)
defer responseWaiters.Delete(id)
req := &Request{
ID: id,
// fill in other fields as needed
}
if err := enqueueRequest(req); err != nil {
// handle error
}
select {
case resp := <-c:
// process response
fmt.Println(resp)
case <-time.After(10 * time.Second):
// handle timeout error
}
}