Как обмениваться данными между двумя разными вызовами API? - PullRequest
0 голосов
/ 28 марта 2020

Я пытаюсь создать среду, в которой я получаю запросы через REST API и жду, пока другой сервис (который работает через gRP C) опрашивает и выполняет запрос. Это необходимо, потому что «другой» сервис очень глубоко встроен в сеть, и я не могу назвать его напрямую. В то же время я хотел бы буферизовать вывод другой службы обратно в источник запроса.

Есть идеи, как я могу разделить эти данные между 2 различными асинхронными запросами API? Использование файловой системы - это способ ... но я подумал, можно ли сделать это лучше по каналам или что-то в этом роде ...?

Ответы [ 2 ]

1 голос
/ 28 марта 2020

Вид псевдокода ниже:

func RestHandler(payload string) (string, error){
    respChan := make(chan string)
    workId := placeWorkInQueue(payload)
    // Start polling in the background
    go pollForResult(respChan, workId)
    // wait for result in the channel
    var result string
    select {
    case result = <-respChan:
        // implement your timeout logic as a another case: here
    }
    return result, nil
}

// This is poller for just the workId given to it.
func pollForResult(respChan chan string, workId string) {
    // Do the polling for workId result 
    /// Write response to respChan when available 
    // You may have to implement a timeout to give up polling.

}

func placeWorkInQueue(s string) string {
    // Place the job in queue and return a unique workId
    return "unique-id"
}
0 голосов
/ 28 марта 2020

Использовать очереди Redis в обоих направлениях. Конечная точка API записывает запрос и уникальный идентификатор в очередь, регистрирует канал Go с уникальным идентификатором в качестве ключа с центральным считывателем в процессе и ожидает на канале Go.

Средство чтения очереди получает ответы с идентификатором из очереди Redis и отправляет ответ на соответствующий канал Go.

// Response represents the response type from the
// remote service.
type Response struct {
    ID string
    // ... more fields as needed
}

// Request represents request to remote serivce.
type Request struct {
    ID string
    // ... more fields as needed
}

// enqueueRequest writes request to Redis queue.
// Implementation TBD by application.
func enqueueRequest(r *Request) error {
    return nil
}

// dequeueResponse reads a response from a Redis queue.
// Implementation TBD by application.
func dequeueResponse() (*Response, error) {
    return nil, nil
}

Используйте syn c .Map для регистрации ожидающих Go каналов от обработчиков запросов API. Ключ является уникальным идентификатором для запроса, а значением является chan *Response.

  var responseWaiters sync.Map

Запустить queuePump в одной процедуре для удаления результатов из очереди Redis и отправки на соответствующий канал. Запустите gorountine перед обслуживанием HTTP-запросов.

func queuePump() {
    for {
        response, err := dequeueResponse()
        if err != nil {
            // handle error
        }
        v, ok := responseWaiters.Load(response.ID)
        if ok {
            c := v.(chan *Response)
            c <- response

            // Remove cahnel from map to ensure that pump never sends 
            // twice to same channel. The pump will black forever if
            // this happens. 
            responseWaiters.Delete(response.ID)
        }
    }
}

Конечная точка API выделяет уникальный запрос для запроса, регистрирует канал с помощью очереди очереди, ставит в очередь запрос и ожидает ответа.

func apiEndpoint(w http.ResponseWriter, r *http.Request) {
    id := generateUniqueID()

    c := make(chan *Response, 1) // capacity 1 ensures that queue pump will not block
    responseWaiters.Store(id, c)
    defer responseWaiters.Delete(id)

    req := &Request{
        ID: id,
        // fill in other fields as needed
    }

    if err := enqueueRequest(req); err != nil {
        // handle error
    }

    select {
    case resp := <-c:
        // process response
        fmt.Println(resp)
    case <-time.After(10 * time.Second):
        // handle timeout error
    }
}
...