Могу ли я заблокировать, используя указанные значения c в Go? - PullRequest
0 голосов
/ 13 февраля 2020

Отвечая на другой вопрос Я написал небольшую структуру, используя sync.Map для кэширования запросов API.

type PostManager struct {
    sync.Map
}

func (pc PostManager) Fetch(id int) Post {
    post, ok := pc.Load(id)
    if ok {
        fmt.Printf("Using cached post %v\n", id)
        return post.(Post)
    }
    fmt.Printf("Fetching post %v\n", id)
    post = pc.fetchPost(id)
    pc.Store(id, post)

    return post.(Post)
}

К сожалению, если две goroutines обе извлекают Одни и те же некэшированные сообщения одновременно, оба будут отправлять запросы.

var postManager PostManager

wg.Add(3)

var firstPost Post
var secondPost Post
var secondPostAgain Post

go func() {
    // Fetches and caches 1
    firstPost = postManager.Fetch(1)
    defer wg.Done()
}()

go func() {
    // Fetches and caches 2
    secondPost = postManager.Fetch(2)
    defer wg.Done()
}()

go func() {
    // Also fetches and caches 2
    secondPostAgain = postManager.Fetch(2)
    defer wg.Done()
}()

wg.Wait()

Я должен убедиться, что при одновременных выборках с одним и тем же идентификатором только одному разрешено фактически выполнить запрос. Другой должен ждать и будет использовать кэшированный пост. Но также не блокировать выборки разных идентификаторов.

В приведенном выше примере я хочу, чтобы был один и только один вызов pc.fetchPost(1) и pc.fetchPost(2), и они должны быть одновременными.

Ссылка на полный код .

Ответы [ 3 ]

2 голосов
/ 13 февраля 2020

Похоже, можно использовать вторую карту для ожидания, если извлечение уже выполняется.

type PostManager struct {
    sync.Map
    q sync.Map
}

func (pc *PostManager) Fetch(id int) Post {
    post, ok := pc.Load(id)
    if ok {
        fmt.Printf("Using cached post %v\n", id)
        return post.(Post)
    }
    fmt.Printf("Fetching post %v\n", id)
    if c, loaded := pc.q.LoadOrStore(id, make(chan struct{})); !loaded {
        post = pc.fetchPost(id)
        pc.Store(id, post)
        close(c.(chan struct{}))
    } else {
        <-c.(chan struct{})
        post,_ = pc.Load(id)
    }
    return post.(Post)
}

Или, немного более сложно, с той же картой; -)

func (pc *PostManager) Fetch(id int) Post {
    p, ok := pc.Load(id)

    if !ok {
        fmt.Printf("Fetching post %v\n", id)
        if p, ok = pc.LoadOrStore(id, make(chan struct{})); !ok {
            fetched = pc.fetchPost(id)
            pc.Store(id, fetched)
            close(p.(chan struct{}))
            return fetched
        }
    }

    if cached, ok := p.(Post); ok {
        fmt.Printf("Using cached post %v\n", id)
        return cached
    }

    fmt.Printf("Wating for cached post %v\n", id)
    <-p.(chan struct{})
    return pc.Fetch(id)
}
1 голос
/ 13 февраля 2020

Пакет golang .org / x / sync / singleflight был написан именно для этой цели.

Обратите внимание, что все обращения к кешу должны происходить внутри переданной функции обратного вызова делать. В коде, на который вы ссылаетесь в своем комментарии, вы выполняете поиск снаружи; это несколько побеждает цель.

Кроме того, вы должны использовать указатель на singleflight.Group. Это источник вашей гонки данных, и go ветеринар указывает на это:

. / Foo. go: 41: 10: fetchPost передает блокировку по значению: command-line-arguments.PostManager содержит golang .org / x / sync / singleflight.Group содержит syn c .Mutex

Вот как бы я написал это (полный пример на игровой площадке: https://play.golang.org/p/2hE721uA88S):

import (
    "strconv"
    "sync"

    "golang.org/x/sync/singleflight"
)

type PostManager struct {
    sf    *singleflight.Group
    cache *sync.Map
}

func (pc *PostManager) Fetch(id int) Post {
    x, _, _ := pc.sf.Do(strconv.Itoa(id), func() (interface{}, error) {
        post, ok := pc.cache.Load(id)
        if !ok {
            post = pc.fetchPost(id)
            pc.cache.Store(id, post)
        }

        return post, nil
    })

    return x.(Post)
}
1 голос
/ 13 февраля 2020

Вы можете сделать это с двумя картами, одна из которых хранит кэшированные значения, а другая - значения, которые выбираются. Вам также нужно было бы держать блокировку немного дольше, чтобы не нужно было синхронизировать c .Map, как это делает обычная карта. Нечто подобное должно работать (не проверено):

type PostManager struct {
    sync.Mutex
    cached map[int]Post
    loading map[int]chan struct{}
}

Вам необходимо обработать случай, когда загрузка не удалась, в следующих случаях:

// Need to pass pointer pc
func (pc *PostManager) Fetch(id int) Post {
    pc.Lock()
    post, ok:=pc.cached[id]
    if ok {
        pc.Unlock()
        return post
    }
    // See if it is being loaded
    loading, ok:=pc.loading[id]
    if ok {
       // Wait for the loading to complete
       pc.Unlock()
       <-loading
       // Reload
       pc.Lock()
       post,ok:=pc.cached[id]
       // Maybe you need to handle the case where loading failed?
       pc.Unlock()
       return post
    }
    // load it
    loading=make(chan struct{})
    pc.loading[id]=loading
    pc.Unlock()
    post = pc.fetchPost(id)
    pc.Lock()
    pc.cached[id]=post
    delete(pc.loading,id)
    pc.Unlock()
    close(loading)
    return post
}
...