groupcache.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. /*
  2. Copyright 2012 Google Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package groupcache provides a data loading mechanism with caching
  14. // and de-duplication that works across a set of peer processes.
  15. //
  16. // Each data Get first consults its local cache, otherwise delegates
  17. // to the requested key's canonical owner, which then checks its cache
  18. // or finally gets the data. In the common case, many concurrent
  19. // cache misses across a set of peers for the same key result in just
  20. // one cache fill.
  21. package groupcache
  22. import (
  23. "errors"
  24. "math/rand"
  25. "strconv"
  26. "sync"
  27. "sync/atomic"
  28. pb "github.com/golang/groupcache/groupcachepb"
  29. "github.com/golang/groupcache/lru"
  30. "github.com/golang/groupcache/singleflight"
  31. )
  32. // A Getter loads data for a key.
  33. type Getter interface {
  34. // Get returns the value identified by key, populating dest.
  35. //
  36. // The returned data must be unversioned. That is, key must
  37. // uniquely describe the loaded data, without an implicit
  38. // current time, and without relying on cache expiration
  39. // mechanisms.
  40. Get(ctx Context, key string, dest Sink) error
  41. }
  42. // A GetterFunc implements Getter with a function.
  43. type GetterFunc func(ctx Context, key string, dest Sink) error
  44. func (f GetterFunc) Get(ctx Context, key string, dest Sink) error {
  45. return f(ctx, key, dest)
  46. }
  47. var (
  48. mu sync.RWMutex
  49. groups = make(map[string]*Group)
  50. initPeerServerOnce sync.Once
  51. initPeerServer func()
  52. )
  53. // GetGroup returns the named group previously created with NewGroup, or
  54. // nil if there's no such group.
  55. func GetGroup(name string) *Group {
  56. mu.RLock()
  57. g := groups[name]
  58. mu.RUnlock()
  59. return g
  60. }
  61. // NewGroup creates a coordinated group-aware Getter from a Getter.
  62. //
  63. // The returned Getter tries (but does not guarantee) to run only one
  64. // Get call at once for a given key across an entire set of peer
  65. // processes. Concurrent callers both in the local process and in
  66. // other processes receive copies of the answer once the original Get
  67. // completes.
  68. //
  69. // The group name must be unique for each getter.
  70. func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
  71. return newGroup(name, cacheBytes, getter, nil)
  72. }
  73. // If peers is nil, the peerPicker is called via a sync.Once to initialize it.
  74. func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
  75. if getter == nil {
  76. panic("nil Getter")
  77. }
  78. mu.Lock()
  79. defer mu.Unlock()
  80. initPeerServerOnce.Do(callInitPeerServer)
  81. if _, dup := groups[name]; dup {
  82. panic("duplicate registration of group " + name)
  83. }
  84. g := &Group{
  85. name: name,
  86. getter: getter,
  87. peers: peers,
  88. cacheBytes: cacheBytes,
  89. loadGroup: &singleflight.Group{},
  90. }
  91. if fn := newGroupHook; fn != nil {
  92. fn(g)
  93. }
  94. groups[name] = g
  95. return g
  96. }
  97. // newGroupHook, if non-nil, is called right after a new group is created.
  98. var newGroupHook func(*Group)
  99. // RegisterNewGroupHook registers a hook that is run each time
  100. // a group is created.
  101. func RegisterNewGroupHook(fn func(*Group)) {
  102. if newGroupHook != nil {
  103. panic("RegisterNewGroupHook called more than once")
  104. }
  105. newGroupHook = fn
  106. }
  107. // RegisterServerStart registers a hook that is run when the first
  108. // group is created.
  109. func RegisterServerStart(fn func()) {
  110. if initPeerServer != nil {
  111. panic("RegisterServerStart called more than once")
  112. }
  113. initPeerServer = fn
  114. }
  115. func callInitPeerServer() {
  116. if initPeerServer != nil {
  117. initPeerServer()
  118. }
  119. }
  120. // A Group is a cache namespace and associated data loaded spread over
  121. // a group of 1 or more machines.
  122. type Group struct {
  123. name string
  124. getter Getter
  125. peersOnce sync.Once
  126. peers PeerPicker
  127. cacheBytes int64 // limit for sum of mainCache and hotCache size
  128. // mainCache is a cache of the keys for which this process
  129. // (amongst its peers) is authoritative. That is, this cache
  130. // contains keys which consistent hash on to this process's
  131. // peer number.
  132. mainCache cache
  133. // hotCache contains keys/values for which this peer is not
  134. // authoritative (otherwise they would be in mainCache), but
  135. // are popular enough to warrant mirroring in this process to
  136. // avoid going over the network to fetch from a peer. Having
  137. // a hotCache avoids network hotspotting, where a peer's
  138. // network card could become the bottleneck on a popular key.
  139. // This cache is used sparingly to maximize the total number
  140. // of key/value pairs that can be stored globally.
  141. hotCache cache
  142. // loadGroup ensures that each key is only fetched once
  143. // (either locally or remotely), regardless of the number of
  144. // concurrent callers.
  145. loadGroup flightGroup
  146. // Stats are statistics on the group.
  147. Stats Stats
  148. }
  149. // flightGroup is defined as an interface which flightgroup.Group
  150. // satisfies. We define this so that we may test with an alternate
  151. // implementation.
  152. type flightGroup interface {
  153. // Done is called when Do is done.
  154. Do(key string, fn func() (interface{}, error)) (interface{}, error)
  155. }
  156. // Stats are per-group statistics.
  157. type Stats struct {
  158. Gets AtomicInt // any Get request, including from peers
  159. CacheHits AtomicInt // either cache was good
  160. PeerLoads AtomicInt // either remote load or remote cache hit (not an error)
  161. PeerErrors AtomicInt
  162. Loads AtomicInt // (gets - cacheHits)
  163. LoadsDeduped AtomicInt // after singleflight
  164. LocalLoads AtomicInt // total good local loads
  165. LocalLoadErrs AtomicInt // total bad local loads
  166. ServerRequests AtomicInt // gets that came over the network from peers
  167. }
  168. // Name returns the name of the group.
  169. func (g *Group) Name() string {
  170. return g.name
  171. }
  172. func (g *Group) initPeers() {
  173. if g.peers == nil {
  174. g.peers = getPeers()
  175. }
  176. }
  177. func (g *Group) Get(ctx Context, key string, dest Sink) error {
  178. g.peersOnce.Do(g.initPeers)
  179. g.Stats.Gets.Add(1)
  180. if dest == nil {
  181. return errors.New("groupcache: nil dest Sink")
  182. }
  183. value, cacheHit := g.lookupCache(key)
  184. if cacheHit {
  185. g.Stats.CacheHits.Add(1)
  186. return setSinkView(dest, value)
  187. }
  188. // Optimization to avoid double unmarshalling or copying: keep
  189. // track of whether the dest was already populated. One caller
  190. // (if local) will set this; the losers will not. The common
  191. // case will likely be one caller.
  192. destPopulated := false
  193. value, destPopulated, err := g.load(ctx, key, dest)
  194. if err != nil {
  195. return err
  196. }
  197. if destPopulated {
  198. return nil
  199. }
  200. return setSinkView(dest, value)
  201. }
  202. // load loads key either by invoking the getter locally or by sending it to another machine.
  203. func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
  204. g.Stats.Loads.Add(1)
  205. viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
  206. // Check the cache again because singleflight can only dedup calls
  207. // that overlap concurrently. It's possible for 2 concurrent
  208. // requests to miss the cache, resulting in 2 load() calls. An
  209. // unfortunate goroutine scheduling would result in this callback
  210. // being run twice, serially. If we don't check the cache again,
  211. // cache.nbytes would be incremented below even though there will
  212. // be only one entry for this key.
  213. //
  214. // Consider the following serialized event ordering for two
  215. // goroutines in which this callback gets called twice for hte
  216. // same key:
  217. // 1: Get("key")
  218. // 2: Get("key")
  219. // 1: lookupCache("key")
  220. // 2: lookupCache("key")
  221. // 1: load("key")
  222. // 2: load("key")
  223. // 1: loadGroup.Do("key", fn)
  224. // 1: fn()
  225. // 2: loadGroup.Do("key", fn)
  226. // 2: fn()
  227. if value, cacheHit := g.lookupCache(key); cacheHit {
  228. g.Stats.CacheHits.Add(1)
  229. return value, nil
  230. }
  231. g.Stats.LoadsDeduped.Add(1)
  232. var value ByteView
  233. var err error
  234. if peer, ok := g.peers.PickPeer(key); ok {
  235. value, err = g.getFromPeer(ctx, peer, key)
  236. if err == nil {
  237. g.Stats.PeerLoads.Add(1)
  238. return value, nil
  239. }
  240. g.Stats.PeerErrors.Add(1)
  241. // TODO(bradfitz): log the peer's error? keep
  242. // log of the past few for /groupcachez? It's
  243. // probably boring (normal task movement), so not
  244. // worth logging I imagine.
  245. }
  246. value, err = g.getLocally(ctx, key, dest)
  247. if err != nil {
  248. g.Stats.LocalLoadErrs.Add(1)
  249. return nil, err
  250. }
  251. g.Stats.LocalLoads.Add(1)
  252. destPopulated = true // only one caller of load gets this return value
  253. g.populateCache(key, value, &g.mainCache)
  254. return value, nil
  255. })
  256. if err == nil {
  257. value = viewi.(ByteView)
  258. }
  259. return
  260. }
  261. func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) {
  262. err := g.getter.Get(ctx, key, dest)
  263. if err != nil {
  264. return ByteView{}, err
  265. }
  266. return dest.view()
  267. }
  268. func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) {
  269. req := &pb.GetRequest{
  270. Group: &g.name,
  271. Key: &key,
  272. }
  273. res := &pb.GetResponse{}
  274. err := peer.Get(ctx, req, res)
  275. if err != nil {
  276. return ByteView{}, err
  277. }
  278. value := ByteView{b: res.Value}
  279. // TODO(bradfitz): use res.MinuteQps or something smart to
  280. // conditionally populate hotCache. For now just do it some
  281. // percentage of the time.
  282. if rand.Intn(10) == 0 {
  283. g.populateCache(key, value, &g.hotCache)
  284. }
  285. return value, nil
  286. }
  287. func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
  288. if g.cacheBytes <= 0 {
  289. return
  290. }
  291. value, ok = g.mainCache.get(key)
  292. if ok {
  293. return
  294. }
  295. value, ok = g.hotCache.get(key)
  296. return
  297. }
  298. func (g *Group) populateCache(key string, value ByteView, cache *cache) {
  299. if g.cacheBytes <= 0 {
  300. return
  301. }
  302. cache.add(key, value)
  303. // Evict items from cache(s) if necessary.
  304. for {
  305. mainBytes := g.mainCache.bytes()
  306. hotBytes := g.hotCache.bytes()
  307. if mainBytes+hotBytes <= g.cacheBytes {
  308. return
  309. }
  310. // TODO(bradfitz): this is good-enough-for-now logic.
  311. // It should be something based on measurements and/or
  312. // respecting the costs of different resources.
  313. victim := &g.mainCache
  314. if hotBytes > mainBytes/8 {
  315. victim = &g.hotCache
  316. }
  317. victim.removeOldest()
  318. }
  319. }
  320. // CacheType represents a type of cache.
  321. type CacheType int
  322. const (
  323. // The MainCache is the cache for items that this peer is the
  324. // owner for.
  325. MainCache CacheType = iota + 1
  326. // The HotCache is the cache for items that seem popular
  327. // enough to replicate to this node, even though it's not the
  328. // owner.
  329. HotCache
  330. )
  331. // CacheStats returns stats about the provided cache within the group.
  332. func (g *Group) CacheStats(which CacheType) CacheStats {
  333. switch which {
  334. case MainCache:
  335. return g.mainCache.stats()
  336. case HotCache:
  337. return g.hotCache.stats()
  338. default:
  339. return CacheStats{}
  340. }
  341. }
  342. // cache is a wrapper around an *lru.Cache that adds synchronization,
  343. // makes values always be ByteView, and counts the size of all keys and
  344. // values.
  345. type cache struct {
  346. mu sync.RWMutex
  347. nbytes int64 // of all keys and values
  348. lru *lru.Cache
  349. nhit, nget int64
  350. nevict int64 // number of evictions
  351. }
  352. func (c *cache) stats() CacheStats {
  353. c.mu.RLock()
  354. defer c.mu.RUnlock()
  355. return CacheStats{
  356. Bytes: c.nbytes,
  357. Items: c.itemsLocked(),
  358. Gets: c.nget,
  359. Hits: c.nhit,
  360. Evictions: c.nevict,
  361. }
  362. }
  363. func (c *cache) add(key string, value ByteView) {
  364. c.mu.Lock()
  365. defer c.mu.Unlock()
  366. if c.lru == nil {
  367. c.lru = &lru.Cache{
  368. OnEvicted: func(key lru.Key, value interface{}) {
  369. val := value.(ByteView)
  370. c.nbytes -= int64(len(key.(string))) + int64(val.Len())
  371. c.nevict++
  372. },
  373. }
  374. }
  375. c.lru.Add(key, value)
  376. c.nbytes += int64(len(key)) + int64(value.Len())
  377. }
  378. func (c *cache) get(key string) (value ByteView, ok bool) {
  379. c.mu.Lock()
  380. defer c.mu.Unlock()
  381. c.nget++
  382. if c.lru == nil {
  383. return
  384. }
  385. vi, ok := c.lru.Get(key)
  386. if !ok {
  387. return
  388. }
  389. c.nhit++
  390. return vi.(ByteView), true
  391. }
  392. func (c *cache) removeOldest() {
  393. c.mu.Lock()
  394. defer c.mu.Unlock()
  395. if c.lru != nil {
  396. c.lru.RemoveOldest()
  397. }
  398. }
  399. func (c *cache) bytes() int64 {
  400. c.mu.RLock()
  401. defer c.mu.RUnlock()
  402. return c.nbytes
  403. }
  404. func (c *cache) items() int64 {
  405. c.mu.RLock()
  406. defer c.mu.RUnlock()
  407. return c.itemsLocked()
  408. }
  409. func (c *cache) itemsLocked() int64 {
  410. if c.lru == nil {
  411. return 0
  412. }
  413. return int64(c.lru.Len())
  414. }
  415. // An AtomicInt is an int64 to be accessed atomically.
  416. type AtomicInt int64
  417. // Add atomically adds n to i.
  418. func (i *AtomicInt) Add(n int64) {
  419. atomic.AddInt64((*int64)(i), n)
  420. }
  421. // Get atomically gets the value of i.
  422. func (i *AtomicInt) Get() int64 {
  423. return atomic.LoadInt64((*int64)(i))
  424. }
  425. func (i *AtomicInt) String() string {
  426. return strconv.FormatInt(i.Get(), 10)
  427. }
  428. // CacheStats are returned by stats accessors on Group.
  429. type CacheStats struct {
  430. Bytes int64
  431. Items int64
  432. Gets int64
  433. Hits int64
  434. Evictions int64
  435. }