Должны ли работники получать доступ к одной и той же структуре (через указатель) для работы? - PullRequest
0 голосов
/ 04 февраля 2019

Я начинающий суслик, и я написал рабочую очередь слушателя событий для проекта, над которым я работаю.

Я развернул его на промежуточном сервере.После запуска около 100 событий слушатели перестают вызываться при публикации событий.Сервер также не вышел из строя.

Вот моя реализация:

// Event struct 
type Event struct {
  Name string
  Data interface{}
}

// Stream to publish events to
var stream = make(chan *Event, 100)

// Publish sends new event data to the stream by the event name
func Publish(name string, data interface{}) {
  ev := &Event{name, data}
  stream <- ev
}

// Handler provides the interface for all event handlers.
// The Work will be called with the event that it should process
type Handler interface {
  Work(*Event)
}

type worker struct {
  Handler
  Listen chan *Event
  Quit   chan bool
}

// Stop shuts down the worker
func (w *worker) Stop() {
  go func() {
    w.Quit <- true
  }()
}

// Queue of worker Listen channels
type workerQueue chan chan *Event

// registry of workers
var registry = make(map[string][]workerQueue)

// Register creates 20 workers, assigns them to a queue, and 
// appends the resulting worker queue to an event on the handler registry
func Register(name string, handlers ...Handler) {
  if _, ok := registry[name]; !ok {
    registry[name] = make([]workerQueue, 0)
  }

  // Create workerQueues for each handler
  for _, h := range handlers {
    queue := make(workerQueue, numListeners)

    // Create 20 workers
    for i := 0; i < 20; i++ {
      newWorker := worker{
        Handler: h,
        Listen:  make(chan *Event),
        Quit:    make(chan bool),
      }

      go func() {
        for {
          select {
          case ev := <-newWorker.Listen:
            nl.Work(ev)
          case <-newWorker.Quit:
            return
          }
        }
      }()

      queue <- newWorker.Listen
    }

    registry[name] = append(registry[name], queue)
  }
}

// Start begins listening for events on stream
func Start() {
  go func() {
    for {
      select {
      // listen for events
      case ev := <-stream:
        go func() {
          // get registered queues for the event
          queues, ok := registry[ev.Name]

          if !ok {
            return
          }

          // Get worker channel from queue and send the event
          for _, queue := range queues {
            worker := <-queue
            worker <- ev
          }
        }()
      }
    }
  }()
}

Вот пример использования.

// Usage

Start()

type demoHandler struct {
  db *sql.DB
}

type eventData struct {}

func (h *demoHandler) Work(ev *Event) {
  // Do something
  return
}

// Register handler
Register('some-event', &demoHandler{r})

Publish('some-event', &eventData{})

Я передаю указатель на demoHandler как обработчик событий, потому что им нужен доступ к базовому экземпляру sql.Проблема в том, что каждая рабочая очередь использует один и тот же demoHandler?

Я не могу на всю жизнь понять, где я ошибся!Если не считать ошибки в коде обработчика, есть ли в моем коде ошибка, из-за которой все мои работники вышли из строя?

1 Ответ

0 голосов
/ 04 февраля 2019

"Должны ли работники получать доступ к одной и той же структуре (через указатель) для выполнения работы?"Нет, это не проблема.Было бы проблемой, если бы код внутри вашего обработчика получил доступ к критическому разделу , но я думаю, что это не вызывает блокировку вашей программы.

Ваш сервер не падает или не блокируется, потому что нетзапускается паника, и ваша программа слушает и выполняет по отдельным процедурам, которые являются легкими потоками выполнения.

Вероятно, это связано с каналами, которые вы используете для отправки и получения событий.

Отправка и получение на канал блокируются по умолчанию.Это означает, что когда вы отправляете или получаете от канала, он будет блокировать свои процедуры до тех пор, пока другая сторона не будет готова.

В случае буферизованных каналов отправляет блок, когда буфер заполнен,и получает блок, когда буфер пуст, как в вашем потоковом канале:

var stream = make(chan *Event, 100)

Вы сказали: «После того, как было запущено около 100 событий, слушатели перестают вызываться при публикации событий».

Так что если вы вызываете функцию Publish и делаете stream <- ev, когда буфер канала «stream» заполнен, он будет блокироваться до тех пор, пока канал не найдет место для приема другого элемента.

Я бы предложил немного прочитатьо неблокирующих операциях канала .

Возможно, блокировка происходит в какой-то части вашего реального кода использования.

...