Browse Source

grpcproxy: use ccache for key cache

groupcache needs a write lock and has no way to expire keys; ccache can
do this, though.

Also removes the key count metric, since there's no way to efficiently
calculate it using ccache.
Anthony Romano 9 years ago
parent
commit
9fa6c95054
3 changed files with 19 additions and 31 deletions
  1. 19 17
      proxy/grpcproxy/cache/store.go
  2. 0 7
      proxy/grpcproxy/kv.go
  3. 0 7
      proxy/grpcproxy/metrics.go

+ 19 - 17
proxy/grpcproxy/cache/store.go

@@ -17,11 +17,13 @@ package cache
 import (
 	"errors"
 	"sync"
+	"time"
+
+	"github.com/karlseguin/ccache"
 
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/adt"
-	"github.com/golang/groupcache/lru"
 )
 
 var (
@@ -29,12 +31,14 @@ var (
 	ErrCompacted      = rpctypes.ErrGRPCCompacted
 )
 
+const defaultHistoricTTL = time.Hour
+const defaultCurrentTTL = time.Minute
+
 type Cache interface {
 	Add(req *pb.RangeRequest, resp *pb.RangeResponse)
 	Get(req *pb.RangeRequest) (*pb.RangeResponse, error)
 	Compact(revision int64)
 	Invalidate(key []byte, endkey []byte)
-	Size() int
 }
 
 // keyFunc returns the key of an request, which is used to look up in the cache for it's caching response.
@@ -49,7 +53,7 @@ func keyFunc(req *pb.RangeRequest) string {
 
 func NewCache(maxCacheEntries int) Cache {
 	return &cache{
-		lru:          lru.New(maxCacheEntries),
+		lru:          ccache.New(ccache.Configure().MaxSize(int64(maxCacheEntries))),
 		compactedRev: -1,
 	}
 }
@@ -57,7 +61,7 @@ func NewCache(maxCacheEntries int) Cache {
 // cache implements Cache
 type cache struct {
 	mu  sync.RWMutex
-	lru *lru.Cache
+	lru *ccache.Cache
 
 	// a reverse index for cache invalidation
 	cachedRanges adt.IntervalTree
@@ -73,7 +77,11 @@ func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) {
 	defer c.mu.Unlock()
 
 	if req.Revision > c.compactedRev {
-		c.lru.Add(key, resp)
+		if req.Revision == 0 {
+			c.lru.Set(key, resp, defaultCurrentTTL)
+		} else {
+			c.lru.Set(key, resp, defaultHistoricTTL)
+		}
 	}
 	// we do not need to invalidate a request with a revision specified.
 	// so we do not need to add it into the reverse index.
@@ -105,16 +113,16 @@ func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) {
 func (c *cache) Get(req *pb.RangeRequest) (*pb.RangeResponse, error) {
 	key := keyFunc(req)
 
-	c.mu.Lock()
-	defer c.mu.Unlock()
+	c.mu.RLock()
+	defer c.mu.RUnlock()
 
 	if req.Revision < c.compactedRev {
-		c.lru.Remove(key)
+		c.lru.Delete(key)
 		return nil, ErrCompacted
 	}
 
-	if resp, ok := c.lru.Get(key); ok {
-		return resp.(*pb.RangeResponse), nil
+	if item := c.lru.Get(key); item != nil {
+		return item.Value().(*pb.RangeResponse), nil
 	}
 	return nil, errors.New("not exist")
 }
@@ -138,7 +146,7 @@ func (c *cache) Invalidate(key, endkey []byte) {
 	for _, iv := range ivs {
 		keys := iv.Val.([]string)
 		for _, key := range keys {
-			c.lru.Remove(key)
+			c.lru.Delete(key)
 		}
 	}
 	// delete after removing all keys since it is destructive to 'ivs'
@@ -155,9 +163,3 @@ func (c *cache) Compact(revision int64) {
 		c.compactedRev = revision
 	}
 }
-
-func (c *cache) Size() int {
-	c.mu.RLock()
-	defer c.mu.RUnlock()
-	return c.lru.Len()
-}

+ 0 - 7
proxy/grpcproxy/kv.go

@@ -58,14 +58,12 @@ func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRespo
 	req.Serializable = true
 	gresp := (*pb.RangeResponse)(resp.Get())
 	p.cache.Add(&req, gresp)
-	cacheKeys.Set(float64(p.cache.Size()))
 
 	return gresp, nil
 }
 
 func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
 	p.cache.Invalidate(r.Key, nil)
-	cacheKeys.Set(float64(p.cache.Size()))
 
 	resp, err := p.kv.Do(ctx, PutRequestToOp(r))
 	return (*pb.PutResponse)(resp.Put()), err
@@ -73,7 +71,6 @@ func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, e
 
 func (p *kvProxy) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
 	p.cache.Invalidate(r.Key, r.RangeEnd)
-	cacheKeys.Set(float64(p.cache.Size()))
 
 	resp, err := p.kv.Do(ctx, DelRequestToOp(r))
 	return (*pb.DeleteRangeResponse)(resp.Del()), err
@@ -129,8 +126,6 @@ func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, e
 		p.txnToCache(r.Failure, resp.Responses)
 	}
 
-	cacheKeys.Set(float64(p.cache.Size()))
-
 	return (*pb.TxnResponse)(resp), nil
 }
 
@@ -145,8 +140,6 @@ func (p *kvProxy) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.Com
 		p.cache.Compact(r.Revision)
 	}
 
-	cacheKeys.Set(float64(p.cache.Size()))
-
 	return (*pb.CompactionResponse)(resp), err
 }
 

+ 0 - 7
proxy/grpcproxy/metrics.go

@@ -29,12 +29,6 @@ var (
 		Name:      "events_coalescing_total",
 		Help:      "Total number of events coalescing",
 	})
-	cacheKeys = prometheus.NewGauge(prometheus.GaugeOpts{
-		Namespace: "etcd",
-		Subsystem: "grpc_proxy",
-		Name:      "cache_keys_total",
-		Help:      "Total number of keys/ranges cached",
-	})
 	cacheHits = prometheus.NewGauge(prometheus.GaugeOpts{
 		Namespace: "etcd",
 		Subsystem: "grpc_proxy",
@@ -52,7 +46,6 @@ var (
 func init() {
 	prometheus.MustRegister(watchersCoalescing)
 	prometheus.MustRegister(eventsCoalescing)
-	prometheus.MustRegister(cacheKeys)
 	prometheus.MustRegister(cacheHits)
 	prometheus.MustRegister(cachedMisses)
 }