Правильно пакетировать элементы из входного канала - PullRequest
0 голосов
/ 20 декабря 2018

Вариант использования

Я хочу сохранить много данных в базе данных MySQL, которые я получаю по каналу.По соображениям производительности я обрабатываю их партиями по 10 штук.Я получаю входные данные только каждые 3 часа.

Проблема

Предполагая, что я получу 10004 предмета, останется 4 предмета, потому что моя процедура go ждет 10 предметов, прежде чем она "сбросит их" в партии,Я хочу убедиться, что он создает пакет с менее чем 10 элементами на случай, если в этом канале больше нет элементов (канал также будет закрыт производителем).

Код:

// ProcessAudits sends the given audits in batches to SQL
func ProcessAudits(done <-chan bq.Audit) {
    var audits []bq.Audit
    for auditRow := range done {
        user := auditRow.UserID.StringVal
        log.Infof("Received audit %s", user)
        audits = append(audits, auditRow)

        if len(audits) == 10 {
            upsertBigQueryAudits(audits)
            audits = []bigquery.Audit{}
        }
    }
}

Я новичок в Go, и я не уверен, как бы я это правильно реализовал?

Ответы [ 2 ]

0 голосов
/ 20 декабря 2018

Также вы можете использовать таймер.Поиграйте с примером здесь https://play.golang.org/p/0atlGVCL-px

0 голосов
/ 20 декабря 2018

Вот рабочий пример.Когда канал закрыт, диапазон выходит, поэтому вы можете просто обработать любые оставшиеся элементы после цикла.

package main

import (
    "fmt"
    "sync"
)

type Audit struct {
    ID int
}

func upsertBigQueryAudits(audits []Audit) {
    fmt.Printf("Processing batch of %d\n", len(audits))
    for _, a := range audits {
        fmt.Printf("%d ", a.ID)
    }
    fmt.Println()
}

func processAudits(audits <-chan Audit, batchSize int) {
    var batch []Audit
    for audit := range audits {
        batch = append(batch, audit)
        if len(batch) == batchSize {
            upsertBigQueryAudits(batch)
            batch = []Audit{}
        }
    }
    if len(batch) > 0 {
        upsertBigQueryAudits(batch)
    }
}

func produceAudits(x int, to chan Audit) {
    for i := 0; i < x; i++ {
        to <- Audit{
            ID: i,
        }
    }
}

const batchSize = 10

func main() {
    var wg sync.WaitGroup
    audits := make(chan Audit)
    wg.Add(1)
    go func() {
        defer wg.Done()
        processAudits(audits, batchSize)
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        produceAudits(25, audits)
        close(audits)
    }()
    wg.Wait()
    fmt.Println("Complete")
}

Вывод:

Processing batch of 10
0 1 2 3 4 5 6 7 8 9
Processing batch of 10
10 11 12 13 14 15 16 17 18 19
Processing batch of 5
20 21 22 23 24
Complete
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...