Как правильно реализовать отложенный ответ / тайм-аут между serverHTTP и каналом? - PullRequest
0 голосов
/ 31 мая 2019

У меня есть концепция, которую я не знаю, как мне правильно решить, с минимальным влиянием на систему в Go.

Я делаю 'спулер печати', где клиенты могут звонить наAPI (/ StartJob) для обработки заданий на печать.

Поскольку существует только один принтер, поэтому узким местом является отдельный работник, который обрабатывает каждое задание за раз, но клиенты могут передавать одно задание в любой момент времени, поэтомупросто поставьте в очередь, и работник будет обрабатывать каждую работу за время, которое требуется шаг за шагом.

Способ, которым я делаю это, заключается в том, что ServeHTTP передает работу на канал (обратите внимание, здесь я просто передаю идентификаторесли рабочий будет искать данные печати из этого списка):

func (gv *GlobalVariables) ServeHTTP(w http.ResponseWriter, r *http.Request) {

    switch r.URL.Path {
    case "/StartJob":
        newPrintJob := QueueElement {jobid: "jobid"}
        gv.printQueue <- newPrintJob
        fmt.Fprintf(w, "instant reply from serveHTTP\r\n")

    default:
        fmt.Fprintf(w, "No such Api")
    }
  }

Затем работник просто все время запускает и обрабатывает любые поступающие задания. Реального кода здесь немного больше, но в конце он выполняетвнешний процесс:

  func worker(jobs <-chan QueueElement) {
    for {
        job := <-jobs
        processExec ("START /i /b processAndPrint.exe -"+job.jobid)
      }

Дело в том, что внешний процесс может занять некоторое время, иногда его мгновение, но при некоторых обстоятельствах выполнение задачи может занять 1 минутупрежде чем он вернется.

Моя проблема здесь в том, что теперь в serverHTTP я мгновенно переписываюсь с клиентом, не зная, было ли задание первым в очереди и его можно было мгновенно обработать или если оно было поставлено в очередьи, возможно, до обработки будет несколько секунд или минут:

  fmt.Fprintf(w, "instant reply from serveHTTP\r\n")

Я хотел бы дать клиенту до 5 секунд, чтобы получить ответ, если его работа была обработана за это время, илиесли нет, скажите ему, что ему нужно перезвонить позже, чтобы проверить статус своей работы.

Я имел в виду несколько подходов:

  1. В моем QueueElemnt я передаюhttp.ResponseWriter, так что я могу писать в ответчик (ответ обратно клиенту) от рабочего.Это я могу сделать, если я оставлю serveHTTP в спящем режиме, так как ResponseWriter завершит работу, когда подпрограмма go существует.Поэтому здесь мне нужно было бы подождать в serveHTTP, а затем, когда его ожидание, работнику разрешено писать в ResponseWriter.

    Проблема с этим подходом состоит в том, что, если задание находится на расстоянии нескольких минут, работник не будет писатьчто-нибудь для этого ResponseWriter, и serveHTTP не будет знать, был ли отправлен ответ от работника.

  2. Я мог бы создать канал для каждого элемента QueueElement, чтобы serveHTTP, а не только работникно фактическое задание, если оно обрабатывается работником, может общаться друг с другом.

    Этот подход я не проверял, но я также беспокоюсь, что он излишний и тяжелый для системы, поскольку мы можем иметьситуация, когда у нас появляется много-много запросов API и, следовательно, большая очередь, которая обрабатывается, так что, хотя мне нужно было бы тайм-аут / отменить его через 5 секунд, я думаю, что концепция излишняя?

  3. Я мог бы передать мьютексированное значение в queueElement, которое оба serveHTTP могли проверять до 5 секундs и очередь может проверять / манипулировать, но в случае, если задание завершено, queueElement исчезает, поэтому это может привести к конфликтам.

  4. Я мог бы сделать вариант № 1), где я пишумой собственный обработчик ответов и используйте флаг, если что-то уже было записано в него, так что serveHTTP проверит до 5 секунд, чтобы проверить, не написал ли Worker ответ клиенту, и в этом случае выйдет serveHTTP без ответаили, в случае отсутствия записи, serveHTTP запишет сообщение обратно клиенту, немного по строчке this .

Но я не чувствую ничего из этогоочень плавный, и я не хочу запускать бесконечное количество подпрограмм или каналов или блокировать себя везде, где есть муксы, поскольку я не знаю, как это влияет на систему.

Может ли кто-нибудь мне помочь?работать правильно, чтобы реализовать такую ​​вещь?Я читал страницу вверх страницу вниз и не нашел хороший и чистый способ достижения этого.

Ответы [ 2 ]

2 голосов
/ 31 мая 2019

Я думаю, что самый простой подход - первый слегка измененный.Вы можете передать http.ResponseWriter работнику, который охватывает другого работника, который фактически выполняет задание, в то время как «родительский» работник ожидает его завершения или тайм-аута.Он ответит HTTP-клиенту, как только одно из двух событий произойдет первым.

func (gv *GlobalVariables) ServeHTTP(w http.ResponseWriter, r *http.Request) {

    switch r.URL.Path {
    case "/StartJob":
        newPrintJob := QueueElement {writer: w, jobid: "jobid"}
        gv.printQueue <- newPrintJob
        fmt.Fprintf(w, "instant reply from serveHTTP\r\n")

    default:
        fmt.Fprintf(w, "No such Api")
    }
  }

func worker(jobs <-chan QueueElement) {
    for {
        done := make(chan struct{})
        job := <-jobs

        go func(d chan struct{}, q QueueElement) {
            processExec ("START /i /b processAndPrint.exe -"+q.jobid)
            d <- struct{}{}
        }(done, job)

        select {
            //Timeout
            case <-time.After(time.Second * 5):
                fmt.Fprintf(w, "job is taking more than 5 seconds to complete\r\n")
            //Job processing finished in time
            case <-done:
                fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
        }
   }

Вы можете вызвать «ожидающую» процедуру, как только получите HTTP-запрос.Таким образом, таймер тайм-аута будет учитывать всю обработку запроса / задания.

Пример:

package main

import (
    "context"
    "fmt"
    "net/http"
    "time"
)

func (gv *GlobalVariables) ServeHTTP(w http.ResponseWriter, r *http.Request) {

    switch r.URL.Path {
    case "/StartJob":
        doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine
        newPrintJob := QueueElement{
            doneChan: doneC,
            jobid:    "jobid",
        }
        go func(doneChan chan struct{}) {
            ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
            defer cancel()
            select {
            //If this triggers first, then this waiting goroutine would exit
            //and nobody would be listeding the 'doneChan'. This is why it has to be buffered.
            case <-ctx.Done():
                fmt.Fprintf(w, "job is taking more than 5 seconds to complete\r\n")
            case <-doneChan:
                fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
            }
        }(doneC)
        gv.printQueue <- newPrintJob

    default:
        fmt.Fprintf(w, "No such Api")
    }
}

func worker(jobs <-chan QueueElement) {
    for {
        job := <-jobs
        processExec("START /i /b processAndPrint.exe -" + job.jobid)
        job.doneChan <- struct{}{}

    }
}
0 голосов
/ 31 мая 2019

Я бы не стал долго задерживать запрос, потому что мы не уверены, когда работа будет обработана.

Один подход, который я могу придумать, это:

Изначально ответьте с сервера как принятый / поставленный в очередь и верните job_id.

{
   "job_id": "1",
   "status": "queued"
}

Клиент может опрашивать (скажем, каждые 5 секунд) или использовать длинный опрос, чтобы проверить состояние задания.

Когда он работает:

{
   "job_id": "1",
   "status": "processing"
}

После завершения:

{
   "job_id": "1",
   "status": "success/failed"
}

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...