Как передать срез байтов между go подпрограммами, используя каналы - PullRequest
0 голосов
/ 06 января 2020

У меня есть функция, которая читает данные из source и отправляет их на destination. Источником и местом назначения может быть что угодно, скажем, для этого примера источником является база данных (любая MySQL, PostgreSQL ...), а пунктом назначения является distributed Q (любая ... ActiveMQ, Kafka). Сообщения хранятся в байтах.

Это основная функция. идея заключается в том, что он запустит новую подпрограмму go и будет ожидать возвращения сообщений для дальнейшей обработки.

type Message []byte

func (p *ProcessorService) Continue(dictId int) {
    level.Info(p.logger).Log("process", "message", "dictId", dictId)
    retrieved := make(chan Message)

    go func() {
        err := p.src.Read(retrieved, strconv.Itoa(p.dictId))
        if err != nil {
            level.Error(p.logger).Log("process", "read", "message", "err", err)
        }
    }()

    for r := range retrieved {
        go func(message Message) {
            level.Info(p.logger).Log("message", message)
            if len(message) > 0 {
                if err := p.dst.sendToQ(message); err != nil {
                    level.Error(p.logger).Log("failed", "during", "persist", "err", err)
                }
            } else {
                level.Error(p.logger).Log("failed")
            }
        }(r)
    }
}

и это сама функция чтения

func (s *Storage) Read(out chan<- Message, opt ...string) error {

    // I just skip some basic database read operations here
    // but idea is simple, read data from the table / file row by row and 
    // 
    for _, value := range dataFromDB {
            message, err := value.row 
            if err == nil {
                out <- message
            } else {
                errorf("Unable to get data %v", err)
                out <- make([]byte, 0)
            }
        }
    })

    close(out)

    if err != nil {
        return err
    }

    return nil
}

Как вы можете видеть, связь осуществляется через канал <канал <. Моя проблема в функции продолжения, в частности, здесь </p>

for r := range retrieved { 
   go func(message Message) {
       // basically here message and r are pointing to the same underlying array
   }
}

Когда полученные данные var r является типом байта слайса. Затем он передается в go func(message Message) все, что передано по значению в go, в этом случае var r будет передано как копия анонимной забаве c, однако у него все еще будет указатель на данные нижележащего слайса. Мне любопытно, может ли это быть проблемой во время выполнения p.dst.sendToQ(message);, и в то же время функция чтения отправит что-то в out channel, что приведет к переопределению структуры данных среза новой информацией. Должен ли я скопировать байтовый фрагмент r в новый байтовый фрагмент перед передачей в анонимную функцию, чтобы базовые массивы были другими? Я проверил это, но не смог вызвать такое поведение. Не уверен, что я параноик или беспокоюсь об этом.

1 Ответ

0 голосов
/ 06 января 2020

message в p.dst.sendToQ(message) - это тот же фрагмент, что и value.row, когда вы получаете данные из БД. Итак, если каждый value.row имеет свой базовый массив, у вас все будет хорошо. Итак, я предлагаю вам проверить исходный код и убедиться, что он не использует общий байтовый массив и продолжает переписывать его.

...