| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489 |
- /*
- Copyright 2012 Google Inc.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- // Package groupcache provides a data loading mechanism with caching
- // and de-duplication that works across a set of peer processes.
- //
- // Each data Get first consults its local cache, otherwise delegates
- // to the requested key's canonical owner, which then checks its cache
- // or finally gets the data. In the common case, many concurrent
- // cache misses across a set of peers for the same key result in just
- // one cache fill.
- package groupcache
- import (
- "errors"
- "math/rand"
- "strconv"
- "sync"
- "sync/atomic"
- pb "github.com/golang/groupcache/groupcachepb"
- "github.com/golang/groupcache/lru"
- "github.com/golang/groupcache/singleflight"
- )
- // A Getter loads data for a key.
- type Getter interface {
- // Get returns the value identified by key, populating dest.
- //
- // The returned data must be unversioned. That is, key must
- // uniquely describe the loaded data, without an implicit
- // current time, and without relying on cache expiration
- // mechanisms.
- Get(ctx Context, key string, dest Sink) error
- }
- // A GetterFunc implements Getter with a function.
- type GetterFunc func(ctx Context, key string, dest Sink) error
- func (f GetterFunc) Get(ctx Context, key string, dest Sink) error {
- return f(ctx, key, dest)
- }
- var (
- mu sync.RWMutex
- groups = make(map[string]*Group)
- initPeerServerOnce sync.Once
- initPeerServer func()
- )
- // GetGroup returns the named group previously created with NewGroup, or
- // nil if there's no such group.
- func GetGroup(name string) *Group {
- mu.RLock()
- g := groups[name]
- mu.RUnlock()
- return g
- }
- // NewGroup creates a coordinated group-aware Getter from a Getter.
- //
- // The returned Getter tries (but does not guarantee) to run only one
- // Get call at once for a given key across an entire set of peer
- // processes. Concurrent callers both in the local process and in
- // other processes receive copies of the answer once the original Get
- // completes.
- //
- // The group name must be unique for each getter.
- func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
- return newGroup(name, cacheBytes, getter, nil)
- }
- // If peers is nil, the peerPicker is called via a sync.Once to initialize it.
- func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
- if getter == nil {
- panic("nil Getter")
- }
- mu.Lock()
- defer mu.Unlock()
- initPeerServerOnce.Do(callInitPeerServer)
- if _, dup := groups[name]; dup {
- panic("duplicate registration of group " + name)
- }
- g := &Group{
- name: name,
- getter: getter,
- peers: peers,
- cacheBytes: cacheBytes,
- loadGroup: &singleflight.Group{},
- }
- if fn := newGroupHook; fn != nil {
- fn(g)
- }
- groups[name] = g
- return g
- }
- // newGroupHook, if non-nil, is called right after a new group is created.
- var newGroupHook func(*Group)
- // RegisterNewGroupHook registers a hook that is run each time
- // a group is created.
- func RegisterNewGroupHook(fn func(*Group)) {
- if newGroupHook != nil {
- panic("RegisterNewGroupHook called more than once")
- }
- newGroupHook = fn
- }
- // RegisterServerStart registers a hook that is run when the first
- // group is created.
- func RegisterServerStart(fn func()) {
- if initPeerServer != nil {
- panic("RegisterServerStart called more than once")
- }
- initPeerServer = fn
- }
- func callInitPeerServer() {
- if initPeerServer != nil {
- initPeerServer()
- }
- }
- // A Group is a cache namespace and associated data loaded spread over
- // a group of 1 or more machines.
- type Group struct {
- name string
- getter Getter
- peersOnce sync.Once
- peers PeerPicker
- cacheBytes int64 // limit for sum of mainCache and hotCache size
- // mainCache is a cache of the keys for which this process
- // (amongst its peers) is authoritative. That is, this cache
- // contains keys which consistent hash on to this process's
- // peer number.
- mainCache cache
- // hotCache contains keys/values for which this peer is not
- // authoritative (otherwise they would be in mainCache), but
- // are popular enough to warrant mirroring in this process to
- // avoid going over the network to fetch from a peer. Having
- // a hotCache avoids network hotspotting, where a peer's
- // network card could become the bottleneck on a popular key.
- // This cache is used sparingly to maximize the total number
- // of key/value pairs that can be stored globally.
- hotCache cache
- // loadGroup ensures that each key is only fetched once
- // (either locally or remotely), regardless of the number of
- // concurrent callers.
- loadGroup flightGroup
- // Stats are statistics on the group.
- Stats Stats
- }
- // flightGroup is defined as an interface which flightgroup.Group
- // satisfies. We define this so that we may test with an alternate
- // implementation.
- type flightGroup interface {
- // Done is called when Do is done.
- Do(key string, fn func() (interface{}, error)) (interface{}, error)
- }
- // Stats are per-group statistics.
- type Stats struct {
- Gets AtomicInt // any Get request, including from peers
- CacheHits AtomicInt // either cache was good
- PeerLoads AtomicInt // either remote load or remote cache hit (not an error)
- PeerErrors AtomicInt
- Loads AtomicInt // (gets - cacheHits)
- LoadsDeduped AtomicInt // after singleflight
- LocalLoads AtomicInt // total good local loads
- LocalLoadErrs AtomicInt // total bad local loads
- ServerRequests AtomicInt // gets that came over the network from peers
- }
- // Name returns the name of the group.
- func (g *Group) Name() string {
- return g.name
- }
- func (g *Group) initPeers() {
- if g.peers == nil {
- g.peers = getPeers()
- }
- }
- func (g *Group) Get(ctx Context, key string, dest Sink) error {
- g.peersOnce.Do(g.initPeers)
- g.Stats.Gets.Add(1)
- if dest == nil {
- return errors.New("groupcache: nil dest Sink")
- }
- value, cacheHit := g.lookupCache(key)
- if cacheHit {
- g.Stats.CacheHits.Add(1)
- return setSinkView(dest, value)
- }
- // Optimization to avoid double unmarshalling or copying: keep
- // track of whether the dest was already populated. One caller
- // (if local) will set this; the losers will not. The common
- // case will likely be one caller.
- destPopulated := false
- value, destPopulated, err := g.load(ctx, key, dest)
- if err != nil {
- return err
- }
- if destPopulated {
- return nil
- }
- return setSinkView(dest, value)
- }
- // load loads key either by invoking the getter locally or by sending it to another machine.
- func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
- g.Stats.Loads.Add(1)
- viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
- // Check the cache again because singleflight can only dedup calls
- // that overlap concurrently. It's possible for 2 concurrent
- // requests to miss the cache, resulting in 2 load() calls. An
- // unfortunate goroutine scheduling would result in this callback
- // being run twice, serially. If we don't check the cache again,
- // cache.nbytes would be incremented below even though there will
- // be only one entry for this key.
- //
- // Consider the following serialized event ordering for two
- // goroutines in which this callback gets called twice for hte
- // same key:
- // 1: Get("key")
- // 2: Get("key")
- // 1: lookupCache("key")
- // 2: lookupCache("key")
- // 1: load("key")
- // 2: load("key")
- // 1: loadGroup.Do("key", fn)
- // 1: fn()
- // 2: loadGroup.Do("key", fn)
- // 2: fn()
- if value, cacheHit := g.lookupCache(key); cacheHit {
- g.Stats.CacheHits.Add(1)
- return value, nil
- }
- g.Stats.LoadsDeduped.Add(1)
- var value ByteView
- var err error
- if peer, ok := g.peers.PickPeer(key); ok {
- value, err = g.getFromPeer(ctx, peer, key)
- if err == nil {
- g.Stats.PeerLoads.Add(1)
- return value, nil
- }
- g.Stats.PeerErrors.Add(1)
- // TODO(bradfitz): log the peer's error? keep
- // log of the past few for /groupcachez? It's
- // probably boring (normal task movement), so not
- // worth logging I imagine.
- }
- value, err = g.getLocally(ctx, key, dest)
- if err != nil {
- g.Stats.LocalLoadErrs.Add(1)
- return nil, err
- }
- g.Stats.LocalLoads.Add(1)
- destPopulated = true // only one caller of load gets this return value
- g.populateCache(key, value, &g.mainCache)
- return value, nil
- })
- if err == nil {
- value = viewi.(ByteView)
- }
- return
- }
- func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) {
- err := g.getter.Get(ctx, key, dest)
- if err != nil {
- return ByteView{}, err
- }
- return dest.view()
- }
- func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) {
- req := &pb.GetRequest{
- Group: &g.name,
- Key: &key,
- }
- res := &pb.GetResponse{}
- err := peer.Get(ctx, req, res)
- if err != nil {
- return ByteView{}, err
- }
- value := ByteView{b: res.Value}
- // TODO(bradfitz): use res.MinuteQps or something smart to
- // conditionally populate hotCache. For now just do it some
- // percentage of the time.
- if rand.Intn(10) == 0 {
- g.populateCache(key, value, &g.hotCache)
- }
- return value, nil
- }
- func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
- if g.cacheBytes <= 0 {
- return
- }
- value, ok = g.mainCache.get(key)
- if ok {
- return
- }
- value, ok = g.hotCache.get(key)
- return
- }
- func (g *Group) populateCache(key string, value ByteView, cache *cache) {
- if g.cacheBytes <= 0 {
- return
- }
- cache.add(key, value)
- // Evict items from cache(s) if necessary.
- for {
- mainBytes := g.mainCache.bytes()
- hotBytes := g.hotCache.bytes()
- if mainBytes+hotBytes <= g.cacheBytes {
- return
- }
- // TODO(bradfitz): this is good-enough-for-now logic.
- // It should be something based on measurements and/or
- // respecting the costs of different resources.
- victim := &g.mainCache
- if hotBytes > mainBytes/8 {
- victim = &g.hotCache
- }
- victim.removeOldest()
- }
- }
- // CacheType represents a type of cache.
- type CacheType int
- const (
- // The MainCache is the cache for items that this peer is the
- // owner for.
- MainCache CacheType = iota + 1
- // The HotCache is the cache for items that seem popular
- // enough to replicate to this node, even though it's not the
- // owner.
- HotCache
- )
- // CacheStats returns stats about the provided cache within the group.
- func (g *Group) CacheStats(which CacheType) CacheStats {
- switch which {
- case MainCache:
- return g.mainCache.stats()
- case HotCache:
- return g.hotCache.stats()
- default:
- return CacheStats{}
- }
- }
- // cache is a wrapper around an *lru.Cache that adds synchronization,
- // makes values always be ByteView, and counts the size of all keys and
- // values.
- type cache struct {
- mu sync.RWMutex
- nbytes int64 // of all keys and values
- lru *lru.Cache
- nhit, nget int64
- nevict int64 // number of evictions
- }
- func (c *cache) stats() CacheStats {
- c.mu.RLock()
- defer c.mu.RUnlock()
- return CacheStats{
- Bytes: c.nbytes,
- Items: c.itemsLocked(),
- Gets: c.nget,
- Hits: c.nhit,
- Evictions: c.nevict,
- }
- }
- func (c *cache) add(key string, value ByteView) {
- c.mu.Lock()
- defer c.mu.Unlock()
- if c.lru == nil {
- c.lru = &lru.Cache{
- OnEvicted: func(key lru.Key, value interface{}) {
- val := value.(ByteView)
- c.nbytes -= int64(len(key.(string))) + int64(val.Len())
- c.nevict++
- },
- }
- }
- c.lru.Add(key, value)
- c.nbytes += int64(len(key)) + int64(value.Len())
- }
- func (c *cache) get(key string) (value ByteView, ok bool) {
- c.mu.Lock()
- defer c.mu.Unlock()
- c.nget++
- if c.lru == nil {
- return
- }
- vi, ok := c.lru.Get(key)
- if !ok {
- return
- }
- c.nhit++
- return vi.(ByteView), true
- }
- func (c *cache) removeOldest() {
- c.mu.Lock()
- defer c.mu.Unlock()
- if c.lru != nil {
- c.lru.RemoveOldest()
- }
- }
- func (c *cache) bytes() int64 {
- c.mu.RLock()
- defer c.mu.RUnlock()
- return c.nbytes
- }
- func (c *cache) items() int64 {
- c.mu.RLock()
- defer c.mu.RUnlock()
- return c.itemsLocked()
- }
- func (c *cache) itemsLocked() int64 {
- if c.lru == nil {
- return 0
- }
- return int64(c.lru.Len())
- }
- // An AtomicInt is an int64 to be accessed atomically.
- type AtomicInt int64
- // Add atomically adds n to i.
- func (i *AtomicInt) Add(n int64) {
- atomic.AddInt64((*int64)(i), n)
- }
- // Get atomically gets the value of i.
- func (i *AtomicInt) Get() int64 {
- return atomic.LoadInt64((*int64)(i))
- }
- func (i *AtomicInt) String() string {
- return strconv.FormatInt(i.Get(), 10)
- }
- // CacheStats are returned by stats accessors on Group.
- type CacheStats struct {
- Bytes int64
- Items int64
- Gets int64
- Hits int64
- Evictions int64
- }
|