Вот версия, которая блокирует как можно меньше и пытается минимизировать распределение. Он также относительно прост и портативен.
Он использует один выбор, чтобы избежать дополнительного механизма блокировки, и записывает асинхронный файл json, чтобы предотвратить блокировку потребительского канала.
Существует специальный модуль записи json. рабочий, который предотвращает асинхронный вызов в середине основного выбора, который потребовал бы дополнительного механизма синхронизации JIT.
Я также добавил необходимые механизмы для правильного завершения программы (IE: закрыть производителя, приемник потребления, приемник сообщений, подождите, пока диск не будет записан).
https://play.golang.org/p/5k3DgrceqKH
package main
import (
"encoding/json"
"log"
"os"
"os/signal"
"sync"
"time"
)
type Message map[string]interface{}
type Messages []Message
func main() {
c := make(chan Message)
go func() {
tick := time.Tick(200 * time.Millisecond)
s := make(chan os.Signal)
signal.Notify(s)
for {
select {
case <-s:
close(c)
return
case <-tick:
mb := []byte(`{ "glossary": { "title": "example glossary", "GlossDiv": { "title": "S", "GlossList": { "GlossEntry": { "ID": "SGML", "SortAs": "SGML", "GlossTerm": "Standard Generalized Markup Language", "Acronym": "SGML", "Abbrev": "ISO 8879:1986", "GlossDef": { "para": "A meta-markup language, used to create markup languages such as DocBook.", "GlossSeeAlso": ["GML", "XML"] }, "GlossSee": "markup" } } } } }`)
var message Message
json.Unmarshal(mb, &message)
c <- message
}
}
}()
var msgPool = sync.Pool{
New: func() interface{} {
return Messages{}
},
}
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()
cMessages := make(chan Messages)
go func() {
defer wg.Done()
for messages := range cMessages {
log.Println("saved", len(messages), "messages")
// content, _ := json.Marshal(messages)
// now := time.Now().Unix()
// filename := "test" + strconv.FormatInt(now, 10) + ".json"
// err := ioutil.WriteFile(filename, content, 0644)
// if err != nil {
// log.Println(err)
// }
msgPool.Put(messages)
}
}()
messages := msgPool.Get().(Messages)
save := time.Tick(1 * time.Second)
for {
select {
case newmessage, ok := <-c:
if !ok {
<-save
cMessages <- messages
close(cMessages)
return
}
messages = append(messages, newmessage)
case <-save:
cMessages <- messages
messages = msgPool.Get().(Messages)
messages = messages[:0]
}
}
}