Я пытаюсь построить конвейер для приема данных с Go. 3 этапа: «Загрузка пакетов», «Преобразование каждого сообщения» и «Добавление сообщений в очередь».
Логика c, которая мне показалась естественной, состоит в том, чтобы создать 3 функции для каждого этапа и t ie эти функции с небуферизованными каналами.
Где-то в моем коде я неправильно реализую каналы или не использую группы ожидания? Поскольку только 1 сообщение попадает в финальную стадию, и программа, кажется, останавливается / блокируется.
func (c *worker) startWork() {
// channel for messages to be sent to the queue
chMessagesToEnqueue := make(chan types.Foo)
// channel for messages to be transformed
chMessagesToTransform := make(chan []types.UpstreamFooType)
// start the goroutines with the channels
go c.startTransformer(chMessagesToTransform, chMessagesToEnqueue)
go c.startEnqueuer(chMessagesToEnqueue)
go c.startDownloader(chMessagesToTransform)
}
func (c *worker) startDownloader(out chan []types.UpstreamFooType) {
// https://github.com/SebastiaanKlippert/go-soda
// uses a library here to fetch data from upstream APIs, but the gist is:
var wg sync.WaitGroup
for i := 0; i < c.workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
// i've cut out some meat out to make more concise
var results []types.UpstreamFooType
err = json.NewDecoder(resp.Body).Decode(&results)
out <- results
}
}()
}
wg.Wait()
}
func (c *worker) startTransformer(in <-chan []types.UpstreamFooType, out chan types.Foo) {
data := <-in
for _, record := range data {
msg := types.Foo{
name: record.fifa,
}
out <- msg
}
}
func (c *worker) startEnqueuer(in <-chan []types.Foo) {
data := <-in
c.logger.Infow("startEnqueuer", "data", data)
}