Golang (GO) Проблема с каналами при попытке реализовать (реплицировать) сценарий запросов к БД / кэш-памяти - PullRequest
0 голосов
/ 26 апреля 2020

Итак, я новичок в каналах, группах ожидания, мьютексе и т. Д. 1011 *, и попытался создать приложение, которое запрашивает фрагмент структуры для данных и, если оно находит данные, загружает его в карту. Я в основном пытаюсь реплицировать сценарий cache / db (но в настоящее время оба находятся в памяти для простоты понимания).

Теперь, при запросе данных, он запрашивается как из базы данных, так и из кеша, и я ' для этого мы установили RWMutex; но при чтении данных, сохраненных либо в кэш, либо в базу данных, используя другие подпрограммы go (по каналам). Он читает как из (db go -программы), так и из (cache go -программы). Так что я делал каждый раз, когда получал чтение из кеша go -программа я тратил db go -рассмотрение одного элемента.


package main

import (
    "fmt"
    "math/rand"
    "strconv"
    "sync"
    "time"
)

type Book struct {
    id   int
    name string
}

var cache = map[int]Book{}
var books []Book
var rnd = rand.New(rand.NewSource(time.Now().UnixNano()))

func main() {
    cacheCh := make(chan Book)
    dbCh := make(chan Book)
    wg := &sync.WaitGroup{}
    m := &sync.RWMutex{}
    loadDb()
    for i := 0; i < 10; i++ {
        id := rnd.Intn(10)
        wg.Add(1)
        go func(id int, wg *sync.WaitGroup, m *sync.RWMutex, ch chan<- Book) {
            if find, book := queryCache(id, m); find {
                fmt.Println("Found Book In Cache: ", book)
                ch <- book
            }
            wg.Done()
        }(id, wg, m, cacheCh)
        wg.Add(1)
        go func(id int, wg *sync.WaitGroup, m *sync.RWMutex, ch chan<- Book) {
            if find, book := queryDb(id, m); find {
                ch <- book
            }
            wg.Done()
        }(id, wg, m, dbCh)
        go func(dbCh, cacheCh <-chan Book) {
            var book Book
            select {
            case book = <-cacheCh:
                msg := <-dbCh
                fmt.Println("Drain DbCh From: ", msg, "\nBook From Cache: ", book.name)
            case book = <-dbCh:
                fmt.Println("Book From Database: ", book.name)
            }
        }(dbCh, cacheCh)
    }

    wg.Wait()
}

func queryCache(id int, m *sync.RWMutex) (bool, Book) {
    m.RLock()
    b, ok := cache[id]
    m.RUnlock()
    return ok, b
}
func queryDb(id int, m *sync.RWMutex) (bool, Book) {
    for _, val := range books {
        if val.id == id {
            m.Lock()
            cache[id] = val
            m.Unlock()
            return true, val
        }
    }
    var bnf Book
    return false, bnf

}

func loadDb() {
    var book Book
    for i := 0; i < 10; i++ {
        book.id = i
        book.name = "a" + strconv.Itoa(i)
        books = append(books, book)
    }

}

Кроме того, я понимаю, что в этом коде он всегда будет запрашивать БД, даже если он находит попадание в кеш, что не идеально. Но это всего лишь тестовый сценарий, в котором все, что я поставил в качестве приоритета, заключается в том, что пользователь получает сведения в самом быстром из возможных режимов (ie, если он отсутствует в кэше, он не должен ждать ответа от кеш, прежде чем запрашивать базу данных).

Пожалуйста, помогите, если возможно, я довольно новичок в этом, так что это может быть что-то тривиальное.

Извините и спасибо.

1 Ответ

0 голосов
/ 27 апреля 2020

Проблемы, которые я вижу, в основном связаны с предположениями о порядке, в котором все будет выполняться:

  • Вы предполагаете, что процедуры поиска в БД / кэш go будут возвращаться в порядке; это не гарантируется (теоретически последняя запрошенная книга может быть добавлена ​​в канал первой, а записи в канале кеша, вероятно, будут упорядочены по-другому, чем канал дБ).
  • Приведенный выше пункт отрицает вашу дополнительные msg := <-dbCh (поскольку база данных может вернуть результат до кэша, и в этом случае канал не будет очищен). Это приводит к тупику.
  • Ваш код завершается, когда завершается группа ожидания, и это может произойти и, вероятно, произойдет до завершения процедуры go, которая извлекает результаты (процедура go с select не использует группу ожидания) - на самом деле это не проблема, потому что ваш код заходит в тупик до этой точки.

Код ниже ( детская площадка ) - моя попытка решить эти проблемы, оставляя ваш код в основном как есть (я создал getBook функцию, потому что я думаю, что это облегчает понимание). Я не собираюсь гарантировать, что мой код не содержит ошибок, но, надеюсь, это поможет.

package main

import (
    "fmt"
    "math/rand"
    "strconv"
    "sync"
    "time"
)

type Book struct {
    id   int
    name string
}

var cache = map[int]Book{}
var books []Book
var rnd = rand.New(rand.NewSource(time.Now().UnixNano())) // Note: Not random on playground as time is simulated

func main() {
    wg := &sync.WaitGroup{}
    m := &sync.RWMutex{}
    loadDb()
    for i := 0; i < 10; i++ {
        id := rnd.Intn(10)
        wg.Add(1) // Could also add 10 before entering the loop
        go func(wg *sync.WaitGroup, m *sync.RWMutex, id int) {
            getBook(m, id)
            wg.Done() // The book has been retrieved; loop can exit when all books retrieved
        }(wg, m, id)
    }
    wg.Wait() // Wait until all books have been retrieved
}

// getBook will retrieve a book (from the cache or the db)
func getBook(m *sync.RWMutex, id int) {
    fmt.Printf("Requesting book: %d\n", id)
    cacheCh := make(chan Book)
    dbCh := make(chan Book)
    go func(id int, m *sync.RWMutex, ch chan<- Book) {
        if find, book := queryCache(id, m); find {
            //fmt.Println("Found Book In Cache: ", book)
            ch <- book
        }
        close(ch)
    }(id, m, cacheCh)
    go func(id int, m *sync.RWMutex, ch chan<- Book) {
        if find, book := queryDb(id, m); find {
            ch <- book
        }
        close(ch)
    }(id, m, dbCh)

    // Wait for a result from one of the above - Note that we make no assumptions
    // about the order of the results but do assume that dbCh will ALWAYS return a book
    // We want to return from this function as soon as a result is received (because in reality
    // we would return the result)
    for {
        select {
        case book, ok := <-cacheCh:
            if !ok { // Book is not in the cache so we need to wait for the database query
                cacheCh = nil // Prevents select from considering this
                continue
            }
            fmt.Println("Book From Cache: ", book.name)
            drainBookChan(dbCh) // The database will always return something (in reality you would cancel a context here to stop the database query)
            return
        case book := <-dbCh:
            dbCh = nil // Nothing else will come through this channel
            fmt.Println("Book From Database: ", book.name)
            if cacheCh != nil {
                drainBookChan(cacheCh)
            }
            return
        }
    }
}

// drainBookChan starts a go routine to drain the channel passed in
func drainBookChan(bChan chan Book) {
    go func() {
        for _ = range bChan {
        }
    }()
}

func queryCache(id int, m *sync.RWMutex) (bool, Book) {
    time.Sleep(time.Duration(rnd.Intn(100)) * time.Millisecond) // Introduce some randomness (otherwise everything comes from DB)
    m.RLock()
    b, ok := cache[id]
    m.RUnlock()
    return ok, b
}
func queryDb(id int, m *sync.RWMutex) (bool, Book) {
    time.Sleep(time.Duration(rnd.Intn(100)) * time.Millisecond) // Introduce some randomness (otherwise everything comes from DB)
    for _, val := range books {
        if val.id == id {
            m.Lock()
            cache[id] = val
            m.Unlock()
            return true, val
        }
    }
    var bnf Book
    return false, bnf

}

func loadDb() {
    var book Book
    for i := 0; i < 10; i++ {
        book.id = i
        book.name = "a" + strconv.Itoa(i)
        books = append(books, book)
    }

}
...