Browse Source

proxy: serve range request from proxy cache if set serializable

mqliang 9 years ago
parent
commit
5676c5cf26
2 changed files with 115 additions and 3 deletions
  1. 99 0
      proxy/grpcproxy/cache/store.go
  2. 16 3
      proxy/grpcproxy/kv.go

+ 99 - 0
proxy/grpcproxy/cache/store.go

@@ -0,0 +1,99 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cache
+
+import (
+	"sync"
+
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/golang/groupcache/lru"
+)
+
+var (
+	DefaultMaxEntries = 2048
+	ErrCompacted      = rpctypes.ErrGRPCCompacted
+)
+
+type Cache interface {
+	Add(req *pb.RangeRequest, resp *pb.RangeResponse)
+	Get(req *pb.RangeRequest) (*pb.RangeResponse, error)
+	Compact(revision int64)
+}
+
+// keyFunc returns the key of an request, which is used to look up in the cache for it's caching response.
+func keyFunc(req *pb.RangeRequest) string {
+	// TODO: use marshalTo to reduce allocation
+	b, err := req.Marshal()
+	if err != nil {
+		panic(err)
+	}
+	return string(b)
+}
+
+func NewCache(maxCacheEntries int) Cache {
+	return &cache{
+		lru: lru.New(maxCacheEntries),
+	}
+}
+
+// cache implements Cache
+type cache struct {
+	mu           sync.RWMutex
+	lru          *lru.Cache
+	compactedRev int64
+}
+
+// Add adds the response of a request to the cache if its revision is larger than the compacted revision of the cache.
+func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) {
+	key := keyFunc(req)
+
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if req.Revision > c.compactedRev {
+		c.lru.Add(key, resp)
+	}
+}
+
+// Get looks up the caching response for a given request.
+// Get is also responsible for lazy eviction when accessing compacted entries.
+func (c *cache) Get(req *pb.RangeRequest) (*pb.RangeResponse, error) {
+	key := keyFunc(req)
+
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if req.Revision > c.compactedRev {
+		c.lru.Remove(key)
+		return nil, ErrCompacted
+	}
+
+	if resp, ok := c.lru.Get(key); ok {
+		return resp.(*pb.RangeResponse), nil
+	}
+	return nil, nil
+}
+
+// Compact invalidate all caching response before the given rev.
+// Replace with the invalidation is lazy. The actual removal happens when the entries is accessed.
+func (c *cache) Compact(revision int64) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if revision > c.compactedRev {
+		c.compactedRev = revision
+	}
+}

+ 16 - 3
proxy/grpcproxy/kv.go

@@ -17,22 +17,35 @@ package grpcproxy
 import (
 	"github.com/coreos/etcd/clientv3"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
-
+	"github.com/coreos/etcd/proxy/grpcproxy/cache"
 	"golang.org/x/net/context"
 )
 
 type kvProxy struct {
-	c *clientv3.Client
+	c     *clientv3.Client
+	cache cache.Cache
 }
 
 func NewKvProxy(c *clientv3.Client) *kvProxy {
 	return &kvProxy{
-		c: c,
+		c:     c,
+		cache: cache.NewCache(cache.DefaultMaxEntries),
 	}
 }
 
 func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
+	// if request set Serializable, serve it from local cache first
+	if r.Serializable {
+		if resp, err := p.cache.Get(r); err == nil || err == cache.ErrCompacted {
+			return resp, err
+		}
+	}
+
 	resp, err := p.c.Do(ctx, RangeRequestToOp(r))
+	if err != nil {
+		p.cache.Add(r, (*pb.RangeResponse)(resp.Get()))
+	}
+
 	return (*pb.RangeResponse)(resp.Get()), err
 }