Лучший способ постоянно получать JSON и записывать на диск в зависимости от времени - PullRequest
1 голос
/ 30 октября 2019

Мне трудно придумать решение для постоянного получения JSON и добавления его к фрагменту, а затем периодически на основе установленного времени внутренней записи его на диск. Я пришел к решению, однако чтение / запись в один и тот же фрагмент без какой-либо синхронизации.

package main

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "runtime"
    "strconv"
    "time"
)

type Message map[string]interface{}
type Messages []Message

var (
    messages Messages
)

func main() {
    c := make(chan Message)

    var messages Messages

    go func() {
        tick := time.Tick(200 * time.Millisecond)
        for {
            select {
            case <-tick:
                mb := []byte(`{ "glossary": { "title": "example glossary", "GlossDiv": { "title": "S", "GlossList": { "GlossEntry": { "ID": "SGML", "SortAs": "SGML", "GlossTerm": "Standard Generalized Markup Language", "Acronym": "SGML", "Abbrev": "ISO 8879:1986", "GlossDef": { "para": "A meta-markup language, used to create markup languages such as DocBook.", "GlossSeeAlso": ["GML", "XML"] }, "GlossSee": "markup" } } } } }`)
                var message Message
                json.Unmarshal(mb, &message)
                c <- message
            }
        }
    }()

    go func() {
        tick := time.Tick(30 * time.Second)

        for {
            select {
            case <-tick:
                content, _ := json.Marshal(messages)
                now := time.Now().Unix()
                filename := "test" + strconv.FormatInt(now, 10) + ".json"
                err := ioutil.WriteFile(filename, content, 0644)
                if err != nil {
                    panic(err)
                }

                messages = nil
            }
        }
    }()

    for {
        newmessage := <-c
        messages = append(messages, newmessage)
    }
}


Ожидается 1 файл за 30 секунд, содержащий все сообщения, полученные за последние 30 секунд, без пропущенных сообщений.

Ответы [ 2 ]

0 голосов
/ 31 октября 2019

Вот версия, которая блокирует как можно меньше и пытается минимизировать распределение. Он также относительно прост и портативен.

Он использует один выбор, чтобы избежать дополнительного механизма блокировки, и записывает асинхронный файл json, чтобы предотвратить блокировку потребительского канала.

Существует специальный модуль записи json. рабочий, который предотвращает асинхронный вызов в середине основного выбора, который потребовал бы дополнительного механизма синхронизации JIT.

Я также добавил необходимые механизмы для правильного завершения программы (IE: закрыть производителя, приемник потребления, приемник сообщений, подождите, пока диск не будет записан).

https://play.golang.org/p/5k3DgrceqKH

package main

import (
    "encoding/json"
    "log"
    "os"
    "os/signal"
    "sync"
    "time"
)

type Message map[string]interface{}
type Messages []Message

func main() {
    c := make(chan Message)

    go func() {
        tick := time.Tick(200 * time.Millisecond)
        s := make(chan os.Signal)
        signal.Notify(s)
        for {
            select {
            case <-s:
                close(c)
                return
            case <-tick:
                mb := []byte(`{ "glossary": { "title": "example glossary", "GlossDiv": { "title": "S", "GlossList": { "GlossEntry": { "ID": "SGML", "SortAs": "SGML", "GlossTerm": "Standard Generalized Markup Language", "Acronym": "SGML", "Abbrev": "ISO 8879:1986", "GlossDef": { "para": "A meta-markup language, used to create markup languages such as DocBook.", "GlossSeeAlso": ["GML", "XML"] }, "GlossSee": "markup" } } } } }`)
                var message Message
                json.Unmarshal(mb, &message)
                c <- message
            }
        }
    }()

    var msgPool = sync.Pool{
        New: func() interface{} {
            return Messages{}
        },
    }
    var wg sync.WaitGroup
    wg.Add(1)
    defer wg.Wait()
    cMessages := make(chan Messages)
    go func() {
        defer wg.Done()
        for messages := range cMessages {
            log.Println("saved", len(messages), "messages")
            // content, _ := json.Marshal(messages)
            // now := time.Now().Unix()
            // filename := "test" + strconv.FormatInt(now, 10) + ".json"
            // err := ioutil.WriteFile(filename, content, 0644)
            // if err != nil {
            //     log.Println(err)
            // }
            msgPool.Put(messages)
        }
    }()

    messages := msgPool.Get().(Messages)
    save := time.Tick(1 * time.Second)
    for {
        select {
        case newmessage, ok := <-c:
            if !ok {
                <-save
                cMessages <- messages
                close(cMessages)
                return
            }
            messages = append(messages, newmessage)
        case <-save:
            cMessages <- messages

            messages = msgPool.Get().(Messages)
            messages = messages[:0]
        }
    }
}
0 голосов
/ 31 октября 2019

Ваше решение выглядит хорошо, за исключением части синхронизации:

var (
    messages Messages
    lock sync.Mutex
)

...
 newmessage := <-c
 lock.Lock()
 messages = append(messages, newmessage)
 lock.Unlock()

И для чтения:

case <-tick:
    // Take a snapshot of messages, and reset it
    lock.Lock()
    msgs:=messages
    messages=Messages{}
    lock.Unlock()
    content, _ := json.Marshal(msgs)
    // No need for messages=nil
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...