Выход из канала в реализации пула рабочих - PullRequest
0 голосов
/ 10 мая 2019

То, что я в конечном итоге хочу выполнить, - это динамически увеличивать или уменьшать моих работников в зависимости от рабочей нагрузки.

Приведенный ниже код успешно анализирует данные, когда Задача проходит через w.Channel

func (s *Storage) StartWorker(w *app.Worker) {

    go func() {

        for {

            w.Pool <- w.Channel // register current worker to the worker pool

            select {

            case task := <-w.Channel: // received a work request, do some work

                time.Sleep(task.Delay)

                fmt.Println(w.WorkerID, "processing task:", task.TaskName)

                w.Results <- s.ProcessTask(w, &task)

            case <-w.Quit:
                fmt.Println("Closing channel for", w.WorkerID)
                return
            }

        }
    }()

}

Точкой блокировки здесь является строка ниже.

w.Pool <- w.Channel

В этом смысле, если я пытаюсь остановить работника (-ов) в любой части моей программы с помощью:

w.Quit <- true

case <-w.Quit: блокируется и никогда не получает доесть еще одна входящая задача на w.Channel (и я предполагаю, что оператор выбора здесь является случайным для каждого выбора случая).

Так как я могу остановить канал (рабочий) независимо?

1 Ответ

0 голосов
/ 10 мая 2019

См. Пример кода ниже, он объявляет функцию разветвления, которая отвечает за увеличение / уменьшение рабочих.

Он работает с использованием тайм-аутов, чтобы определить, что новые рабочие закончили или должны появиться.

есть внутренний цикл, чтобы гарантировать, что каждый элемент обрабатывается, прежде чем двигаться дальше, блокируя источник, когда это необходимо.

package main

import (
    "fmt"
    "io"
    "log"
    "net"
    "os"
)

func main() {
    input := make(chan string)
    fanout(input)
}

func fanout() {
    workers := 0
    distribute := make(chan string)
    workerEnd := make(chan bool)
    for i := range input {
        done := false 
        for done {
            select {
            case distribute<-i:
                done = true
            case <-workerEnd:
                workers--
            default:
                if workers <10 {
                    workers++
                    go func(){
                        work(distribute)
                        workerEnd<-true
                    }()
                }
            }   
        }
    }
}

func work(input chan string) {
    for  {
        select {
        case i := <-input:
            <-time.After(time.Millisecond)
        case <-time.After(time.Second):
            return
        }

    }
}
...