Безопасный для параллелизма планировщик задач - PullRequest
0 голосов
/ 03 ноября 2019

Я пытаюсь сделать какой-то планировщик задач и рабочий. Задачи добавляются в очередь, из которой работник извлекает и обрабатывает их. Задачи из одной очереди должны выполняться последовательно. Задачи из разных очередей выполняются параллельно. Когда работник выполнил все задачи в очереди, он некоторое время ждет и заканчивает свою работу. Очередь должна быть удалена из графика

Я что-то сделал, но думаю, что это не лучшее решение. Я думаю, что могут быть проблемы, когда работник выключается и закрывает канал.

Безопасно ли мое решение для параллелизма?

// Schedule struct
type Schedule struct {
    sync.RWMutex
    queues map[int]chan Task
    idle   byte
}

// Scheduler pushes task to the queue
func (s *Schedule) Scheduler(t Task, i int) {
    var queue chan Task
    var ok bool

    s.RLock()
    if queue, ok = s.queues[i]; !ok {
        s.RUnlock()
        s.Lock()
        if queue, ok = s.queues[i]; !ok {
            queue = make(chan Task)
            s.queues[i] = queue
            go s.worker(queue, i)
        }
        s.Unlock()
    } else {
        s.RUnlock()
    }

    queue <- t
}

// Worker retrieves task from the queue and process
func (s *Schedule) worker(c chan Task, i int) {
    timeout := time.After(s.idle * time.Second)
    done := false
    for !done {
        select {
        case task := <-c:
            task.Execute()
            timeout = time.After(s.idle * time.Second)

        case <-timeout:
            s.Lock()
            close(c)
            delete(s.queues, i)
            s.Unlock()
            done = true

        default:
            time.Sleep(10 * time.Millisecond)
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...