Как эффективно объединить каналы? - PullRequest
0 голосов
/ 03 мая 2018

Я пытаюсь создать концентратор сообщений в Голанге. Сообщения поступают по разным каналам, которые сохраняются в map[uint32]chan []float64. Я делаю бесконечный цикл по карте и проверяю, есть ли у канала сообщение. Если это так, я записываю его в общий канал записи клиента вместе с идентификатором входящего канала. Он работает нормально, но использует все ресурсы процессора, а другие процессы замедляются.

UPD: элементы на карте добавляются и удаляются динамически с помощью другой функции.

Я думаю ограничить процессор для этого приложения через Docker, но, может быть, есть более элегантный путь?

Мой код:

    func (c *Client) accumHandler() {

    for !c.stop {
        c.channels.Range(func(key, value interface{}) bool {

            select {
            case message := <-value.(chan []float64):
                mess := map[uint32]interface{}{key.(uint32): message}

                select {
                case c.send <- mess:

                }

            default:

            }
            return !c.stop
        })
    }
}

Ответы [ 2 ]

0 голосов
/ 03 мая 2018

Он съедает весь ваш ЦП, потому что вы постоянно циклически проверяете сообщения в словаре, поэтому даже когда нет сообщений для обработки ЦП, или, по крайней мере, поток или ядро, работает на пределе. Вам нужно блокировать отправку и получение по каналам!

Я предполагаю, что вы делаете это, потому что вы не знаете, сколько будет каналов, и поэтому не можете просто select на всех входных каналах. Лучше было бы запустить отдельную процедуру для каждого входного канала, который вы сейчас храните в словаре. Каждая процедура должна иметь цикл, в котором она блокирует ожидание входного канала и при получении сообщения выполняет блокировку отправки на канал клиенту, который является общим для всех.

Вопрос не полный, поэтому не могу дать точный код, но у вас будут программы, которые выглядят примерно так:

type Message struct {
    id uint32,
    message []float64
}

func receiverGoroutine(id uint32, input chan []float64, output chan Message) {
    for {
        message := <- input
        output <- Message{id: id, message: message}
    }
}

func clientGoroutine(c *Client, input chan Message) {
    for {
        message := <- input
        // do stuff
    }
}

(Вам также нужно добавить несколько готовых каналов)

В другом месте вы начнете их с кода, подобного этому:

clientChan := make(chan Message)
go clientGoroutine(client, clientChan)

for i:=0; i<max; i++ {
    go receiverGoroutine( (uint32)i, make(chan []float64, clientChan)
}

Или вы можете просто запустить клиентскую подпрограмму, а затем добавить другие по мере необходимости, а не в цикле - это зависит от вашего варианта использования.

0 голосов
/ 03 мая 2018

Если я правильно читаю карты, похоже, что вы пытаетесь передать массив с плавающей точкой в ​​общий канал вместе с идентификатором канала. Я предполагаю, что вы делаете это для передачи нескольких каналов разным издателям, но вам нужно отслеживать только один канал для вашего потребителя.

Оказывается, вам не нужно циклически переключаться между каналами, чтобы увидеть, когда он выводит значение. Вы можете связать каналы вместе внутри горутин. По этой причине нет необходимости в занятом ожидании. Нечто подобное будет соответствовать вашим целям (опять же, если я правильно читаю карты). Посмотрите на комментарий всех заглавных букв для обхода вашего занятого цикла. Ссылка на игровую площадку .

var num_publishes = 3

func main() {
  num_publishers := 10
  single_consumer := make(chan []float64)

  for i:=0;i<num_publishers;i+=1 {
    c := make(chan []float64)

    // connect channel to your single consumer channel
    go func() { for { single_consumer <- <-c } }() // THIS IS PROBABLY WHAT YOU DIDN'T KNOW ABOUT

    // send the channel to the publisher
    go publisher(c, i*100)
  }

  // dumb consumer example
  for i:=0;i<num_publishers*num_publishes;i+=1 {
    fmt.Println(<-single_consumer)
  }
}

func publisher(c chan []float64, publisher_id int) {
  dummy := []float64{
    float64(publisher_id+1),
    float64(publisher_id+2),
    float64(publisher_id+3),
  }
  for i:=0;i<num_publishes;i+=1 {
    time.Sleep(time.Duration(rand.Intn(10000)) * time.Millisecond)
    c <- dummy
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...