У меня есть функция, которая читает данные из source
и отправляет их на destination
. Источником и местом назначения может быть что угодно, скажем, для этого примера источником является база данных (любая MySQL
, PostgreSQL
...), а пунктом назначения является distributed Q
(любая ... ActiveMQ
, Kafka
). Сообщения хранятся в байтах.
Это основная функция. идея заключается в том, что он запустит новую подпрограмму go и будет ожидать возвращения сообщений для дальнейшей обработки.
type Message []byte
func (p *ProcessorService) Continue(dictId int) {
level.Info(p.logger).Log("process", "message", "dictId", dictId)
retrieved := make(chan Message)
go func() {
err := p.src.Read(retrieved, strconv.Itoa(p.dictId))
if err != nil {
level.Error(p.logger).Log("process", "read", "message", "err", err)
}
}()
for r := range retrieved {
go func(message Message) {
level.Info(p.logger).Log("message", message)
if len(message) > 0 {
if err := p.dst.sendToQ(message); err != nil {
level.Error(p.logger).Log("failed", "during", "persist", "err", err)
}
} else {
level.Error(p.logger).Log("failed")
}
}(r)
}
}
и это сама функция чтения
func (s *Storage) Read(out chan<- Message, opt ...string) error {
// I just skip some basic database read operations here
// but idea is simple, read data from the table / file row by row and
//
for _, value := range dataFromDB {
message, err := value.row
if err == nil {
out <- message
} else {
errorf("Unable to get data %v", err)
out <- make([]byte, 0)
}
}
})
close(out)
if err != nil {
return err
}
return nil
}
Как вы можете видеть, связь осуществляется через канал <канал <. Моя проблема в функции продолжения, в частности, здесь </p>
for r := range retrieved {
go func(message Message) {
// basically here message and r are pointing to the same underlying array
}
}
Когда полученные данные var r
является типом байта слайса. Затем он передается в go func(message Message)
все, что передано по значению в go, в этом случае var r
будет передано как копия анонимной забаве c, однако у него все еще будет указатель на данные нижележащего слайса. Мне любопытно, может ли это быть проблемой во время выполнения p.dst.sendToQ(message);
, и в то же время функция чтения отправит что-то в out channel
, что приведет к переопределению структуры данных среза новой информацией. Должен ли я скопировать байтовый фрагмент r
в новый байтовый фрагмент перед передачей в анонимную функцию, чтобы базовые массивы были другими? Я проверил это, но не смог вызвать такое поведение. Не уверен, что я параноик или беспокоюсь об этом.