Browse Source

Merge pull request #6827 from heyitsanthony/proxy-txn-invalidate

grpcproxy: update cache based on txn response
Anthony Romano 9 years ago
parent
commit
4a1e89150b
2 changed files with 24 additions and 2 deletions
  1. 2 2
      proxy/grpcproxy/cache/store.go
  2. 22 0
      proxy/grpcproxy/kv.go

+ 2 - 2
proxy/grpcproxy/cache/store.go

@@ -134,14 +134,14 @@ func (c *cache) Invalidate(key, endkey []byte) {
 	}
 
 	ivs = c.cachedRanges.Stab(ivl)
-	c.cachedRanges.Delete(ivl)
-
 	for _, iv := range ivs {
 		keys := iv.Val.([]string)
 		for _, key := range keys {
 			c.lru.Remove(key)
 		}
 	}
+	// delete after removing all keys since it is destructive to 'ivs'
+	c.cachedRanges.Delete(ivl)
 }
 
 // Compact invalidate all caching response before the given rev.

+ 22 - 0
proxy/grpcproxy/kv.go

@@ -70,6 +70,22 @@ func (p *kvProxy) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*p
 	return (*pb.DeleteRangeResponse)(resp.Del()), err
 }
 
+func (p *kvProxy) txnToCache(reqs []*pb.RequestOp, resps []*pb.ResponseOp) {
+	for i := range resps {
+		switch tv := resps[i].Response.(type) {
+		case *pb.ResponseOp_ResponsePut:
+			p.cache.Invalidate(reqs[i].GetRequestPut().Key, nil)
+		case *pb.ResponseOp_ResponseDeleteRange:
+			rdr := reqs[i].GetRequestDeleteRange()
+			p.cache.Invalidate(rdr.Key, rdr.RangeEnd)
+		case *pb.ResponseOp_ResponseRange:
+			req := reqs[i].GetRequestRange()
+			req.Serializable = true
+			p.cache.Add(req, tv.ResponseRange)
+		}
+	}
+}
+
 func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
 	txn := p.kv.Txn(ctx)
 	cmps := make([]clientv3.Cmp, len(r.Compare))
@@ -97,6 +113,12 @@ func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, e
 	for _, cmp := range r.Compare {
 		p.cache.Invalidate(cmp.Key, nil)
 	}
+	// update any fetched keys
+	if resp.Succeeded {
+		p.txnToCache(r.Success, resp.Responses)
+	} else {
+		p.txnToCache(r.Failure, resp.Responses)
+	}
 	return (*pb.TxnResponse)(resp), nil
 }