концепция диспетчера очереди печати / API и каналы: проблема передачи заданий в очередь из serveHTTP - PullRequest
0 голосов
/ 01 июня 2019

Мне уже здесь помогли, что заставило меня продвинуться немного вперед в этой концепции, я пытаюсь, но она все еще не вполне работает, и я столкнулся с конфликтом, который, кажется, не могу обойти.

Я пытался здесьпроиллюстрируйте то, что я хочу, на блок-схеме - обратите внимание, что клиент может иметь много клиентов, которые будут отправлять сообщения с помощью printjobs, поэтому мы не можем ответить на работника, который обрабатывает нашу работу в данный момент, но для большинства это будет (в пиковые моменты временипоскольку обработка задания печати может занять некоторое время).

enter image description here

type QueueElement struct {
    jobid string
    rw   http.ResponseWriter
  doneChan chan struct{}
}

type GlobalVars struct {
    db   *sql.DB
    wg   sync.WaitGroup
    jobs chan QueueElement
}

func (gv *GlobalVars) ServeHTTP(w http.ResponseWriter, r *http.Request) {

    switch r.URL.Path {
    case "/StartJob":
        fmt.Printf ("incoming\r\n")

            doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine
            newPrintJob := QueueElement{
                    doneChan: doneC,    
                    jobid:    "jobid",
            }

            gv.jobs <- newPrintJob
            func(doneChan chan struct{},w http.ResponseWriter) {

                  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
                    defer cancel()
                    select {
                    //If this triggers first, then this waiting goroutine would exit
                    //and nobody would be listeding the 'doneChan'. This is why it has to be buffered.
                    case <-ctx.Done():
                            fmt.Fprintf(w, "job is taking more than 5 seconds to complete\r\n")
                            fmt.Printf ("took longer than 5 secs\r\n")
                    case <-doneChan:
                            fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
                            fmt.Printf ("instant\r\n")
                    }
            }(doneC,w)

    default:
            fmt.Fprintf(w, "No such Api")
    }
}

func worker(jobs <-chan QueueElement) {
    for {
            job := <-jobs
            processExec ("START /i /b try.cmd")
            fmt.Printf ("job done")
        //  processExec("START /i /b processAndPrint.exe -" + job.jobid)
            job.doneChan <- struct{}{}

    }
}

func main() {
      db, err := sql.Open("sqlite3", "jobs.sqlite")
      if err := db.Ping(); err != nil {
        log.Fatal(err)
    }   

      db.SetMaxOpenConns(1) // prevents locked database error
      _, err = db.Exec(setupSQL)
      if err != nil {
          log.Fatal(err)
      }

    // create a GlobalVars instance
    gv := GlobalVars{
        db  :   db,
        jobs: make(chan QueueElement),
    }
    go worker (gv.jobs)
    // create an http.Server instance and specify our job manager as
    // the handler for requests.
    server := http.Server{
        Handler: &gv,
        Addr : ":8888",
    }
    // start server and accept connections.
    log.Fatal(server.ListenAndServe())
}

Приведенный выше код служит службой HTTP и работнику с помощью здесь,изначально func внутри ServeHTTP был рутиной, и здесь у меня возникает весь конфликт - концепция в том, что в serveHTTP он порождает процесс, который получит ответ от работника, если работник смог обработать работу вовремя,в течение 5 секунд.

Если работа была завершена в течение 1 секунды, я хочу сразу же ответить после этого 1 сек.и клиенту, если требуется 3, я хочу ответить через 3, если требуется более 5, я отправлю ответ через 5 секунд (если задание занимает 13 секунд, я все еще хочу ответить через 5 секунд) - что клиентс этого момента он должен опрашивать задание - но конфликт таков:

a) Когда ServeHTTP завершает работу - тогда ResponseWriter закрывается - и чтобы иметь возможность ответить клиенту, мы должны написать ответ наResponseWriter.

b) если я блокирую serveHTTP (как в примере кода ниже, где я не вызываю func как подпрограмму go), то это влияет не только на этот единственный вызов API, но и на то, что все остальные после этогобудет затронут, поэтому первый входящий вызов будет правильно обслужен вовремя, но вызовы, которые будут поступать в то же время после первого, будут последовательно задерживаться операцией блокировки.

c) если я незаблокируйте его ex - и измените его на процедуру go:

    gv.jobs <- newPrintJob
    go func(doneChan chan struct{},w http.ResponseWriter) {

Тогда нет задержки - можно вызвать во многих API - но проблема здесь в том, чтобы служить HTTTP exists сразу же и тем самым убивает ResponseWriter, а затем я не в состоянии ответить клиенту!

Это действительно вызывает у меня головную боль от того, как я могу обойти этот конфликт, когда я не вызываю блокировку для обслуживания HTTP, поэтому я могуобрабатывать все запросы параллельно, но все еще в состоянии ответить обратно на рассматриваемый ResponseWriter.

Есть ли в любом случае предотвращение serveHTTP выключения устройства записи ответов, даже если функция завершается?

Ответы [ 3 ]

2 голосов
/ 02 июня 2019

Да, вы правы с вашей точкой "в) если я не заблокирую ее" .

Чтобы сохранить средство записи ответов, вы не должны вызывать подпрограмму go внутри него. Скорее вы должны вызывать ServeHTTP в качестве подпрограммы, что делает большинство реализаций http-сервера.
Таким образом, вы не будете блокировать какие-либо вызовы API, каждый вызов API будет выполняться в своей отдельной подпрограмме, блокируемой их функциями.

Поскольку ваш "jobs chan QueueElement" является одним каналом (не буферизованным каналом), поэтому все ваши процессы блокируются на "gv.jobs <- newPrintJob" </strong>.
Лучше использовать буферный канал, чтобы все вызовы API могли добавить его в очередь и получить ответ в зависимости от завершения работы или времени ожидания.

Наличие буферизованного канала имитирует реальный лимит памяти принтеров. (длина очереди 1 является особым случаем)

1 голос
/ 02 июня 2019

Я добавил некоторые обновления в ваш код.Теперь все работает так, как вы описали.

package main

import (
    "database/sql"
    "fmt"
    "log"
    "math/rand"
    "net/http"
    "sync"
    "time"
)

type QueueElement struct {
    jobid    string
    rw       http.ResponseWriter
    doneChan chan struct{}
}

type GlobalVars struct {
    db   *sql.DB
    wg   sync.WaitGroup
    jobs chan QueueElement
}

func (gv *GlobalVars) ServeHTTP(w http.ResponseWriter, r *http.Request) {

    switch r.URL.Path {
    case "/StartJob":
        fmt.Printf("incoming\r\n")

        doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine

        go func(doneChan chan struct{}, w http.ResponseWriter) {
            gv.jobs <- QueueElement{
                doneChan: doneC,
                jobid:    "jobid",
            }
        }(doneC, w)

        select {
        case <-time.Tick(time.Second * 5):
            fmt.Fprintf(w, "job is taking more than 5 seconds to complete\r\n")
            fmt.Printf("took longer than 5 secs\r\n")
        case <-doneC:
            fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
            fmt.Printf("instant\r\n")
        }
    default:
        fmt.Fprintf(w, "No such Api")
    }
}

func worker(jobs <-chan QueueElement) {
    for {
        job := <-jobs
        fmt.Println("START /i /b try.cmd")
        fmt.Printf("job done")

        randTimeDuration := time.Second * time.Duration(rand.Intn(7))

        time.Sleep(randTimeDuration)

        //  processExec("START /i /b processAndPrint.exe -" + job.jobid)
        job.doneChan <- struct{}{}
    }
}

func main() {

    // create a GlobalVars instance
    gv := GlobalVars{
        //db:   db,
        jobs: make(chan QueueElement),
    }
    go worker(gv.jobs)
    // create an http.Server instance and specify our job manager as
    // the handler for requests.
    server := http.Server{
        Handler: &gv,
        Addr:    ":8888",
    }
    // start server and accept connections.
    log.Fatal(server.ListenAndServe())
}
1 голос
/ 01 июня 2019

select оператор должен находиться вне функции goroutine и блокировать запрос до конца выполнения задания или по истечении времени ожидания.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...