Browse Source

grpc-proxy: invalidate cache entries when there is a put/delete

Xiang Li 9 years ago
parent
commit
c8bbb8c53e
2 changed files with 64 additions and 5 deletions
  1. 57 2
      proxy/grpcproxy/cache/store.go
  2. 7 3
      proxy/grpcproxy/kv.go

+ 57 - 2
proxy/grpcproxy/cache/store.go

@@ -20,6 +20,7 @@ import (
 
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/pkg/adt"
 	"github.com/golang/groupcache/lru"
 )
 
@@ -32,6 +33,7 @@ type Cache interface {
 	Add(req *pb.RangeRequest, resp *pb.RangeResponse)
 	Get(req *pb.RangeRequest) (*pb.RangeResponse, error)
 	Compact(revision int64)
+	Invalidate(key []byte, endkey []byte)
 }
 
 // keyFunc returns the key of an request, which is used to look up in the cache for it's caching response.
@@ -53,8 +55,12 @@ func NewCache(maxCacheEntries int) Cache {
 
 // cache implements Cache
 type cache struct {
-	mu           sync.RWMutex
-	lru          *lru.Cache
+	mu  sync.RWMutex
+	lru *lru.Cache
+
+	// a reverse index for cache invalidation
+	cachedRanges adt.IntervalTree
+
 	compactedRev int64
 }
 
@@ -68,6 +74,29 @@ func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) {
 	if req.Revision > c.compactedRev {
 		c.lru.Add(key, resp)
 	}
+	// we do not need to invalidate a request with a revision specified.
+	// so we do not need to add it into the reverse index.
+	if req.Revision != 0 {
+		return
+	}
+
+	var (
+		iv  *adt.IntervalValue
+		ivl adt.Interval
+	)
+	if len(req.RangeEnd) != 0 {
+		ivl = adt.NewStringAffineInterval(string(req.Key), string(req.RangeEnd))
+	} else {
+		ivl = adt.NewStringAffinePoint(string(req.Key))
+	}
+
+	iv = c.cachedRanges.Find(ivl)
+
+	if iv == nil {
+		c.cachedRanges.Insert(ivl, []string{key})
+	} else {
+		iv.Val = append(iv.Val.([]string), key)
+	}
 }
 
 // Get looks up the caching response for a given request.
@@ -89,6 +118,32 @@ func (c *cache) Get(req *pb.RangeRequest) (*pb.RangeResponse, error) {
 	return nil, errors.New("not exist")
 }
 
+// Invalidate invalidates the cache entries that intersecting with the given range from key to endkey.
+func (c *cache) Invalidate(key, endkey []byte) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	var (
+		ivs []*adt.IntervalValue
+		ivl adt.Interval
+	)
+	if len(endkey) == 0 {
+		ivl = adt.NewStringAffinePoint(string(key))
+	} else {
+		ivl = adt.NewStringAffineInterval(string(key), string(endkey))
+	}
+
+	ivs = c.cachedRanges.Stab(ivl)
+	c.cachedRanges.Delete(ivl)
+
+	for _, iv := range ivs {
+		keys := iv.Val.([]string)
+		for _, key := range keys {
+			c.lru.Remove(key)
+		}
+	}
+}
+
 // Compact invalidate all caching response before the given rev.
 // Replace with the invalidation is lazy. The actual removal happens when the entries is accessed.
 func (c *cache) Compact(revision int64) {

+ 7 - 3
proxy/grpcproxy/kv.go

@@ -35,7 +35,6 @@ func NewKvProxy(c *clientv3.Client) pb.KVServer {
 }
 
 func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
-	// if request set Serializable, serve it from local cache first
 	if r.Serializable {
 		resp, err := p.cache.Get(r)
 		switch err {
@@ -51,17 +50,22 @@ func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRespo
 		return nil, err
 	}
 
-	p.cache.Add(r, (*pb.RangeResponse)(resp.Get()))
+	// cache linearizable as serializable
+	r.Serializable = true
+	gresp := (*pb.RangeResponse)(resp.Get())
+	p.cache.Add(r, gresp)
 
-	return (*pb.RangeResponse)(resp.Get()), nil
+	return gresp, nil
 }
 
 func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
+	p.cache.Invalidate(r.Key, nil)
 	resp, err := p.kv.Do(ctx, PutRequestToOp(r))
 	return (*pb.PutResponse)(resp.Put()), err
 }
 
 func (p *kvProxy) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
+	p.cache.Invalidate(r.Key, r.RangeEnd)
 	resp, err := p.kv.Do(ctx, DelRequestToOp(r))
 	return (*pb.DeleteRangeResponse)(resp.Del()), err
 }