Pārlūkot izejas kodu

Merge pull request #5206 from xiang90/lease_header

v3rpc: fill lease header
Xiang Li 9 gadi atpakaļ
vecāks
revīzija
afd2cc7373
3 mainītis faili ar 24 papildinājumiem un 7 dzēšanām
  1. 3 0
      etcdserver/api/v3rpc/header.go
  2. 18 6
      etcdserver/api/v3rpc/lease.go
  3. 3 1
      etcdserver/apply.go

+ 3 - 0
etcdserver/api/v3rpc/header.go

@@ -40,4 +40,7 @@ func (h *header) fill(rh *pb.ResponseHeader) {
 	rh.ClusterId = uint64(h.clusterID)
 	rh.MemberId = uint64(h.memberID)
 	rh.RaftTerm = h.raftTimer.Term()
+	if rh.Revision == 0 {
+		rh.Revision = h.rev()
+	}
 }

+ 18 - 6
etcdserver/api/v3rpc/lease.go

@@ -25,11 +25,12 @@ import (
 )
 
 type LeaseServer struct {
-	le etcdserver.Lessor
+	hdr header
+	le  etcdserver.Lessor
 }
 
-func NewLeaseServer(le etcdserver.Lessor) pb.LeaseServer {
-	return &LeaseServer{le: le}
+func NewLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer {
+	return &LeaseServer{le: s, hdr: newHeader(s)}
 }
 
 func (ls *LeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
@@ -37,15 +38,17 @@ func (ls *LeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest)
 	if err == lease.ErrLeaseExists {
 		return nil, rpctypes.ErrLeaseExist
 	}
+	ls.hdr.fill(resp.Header)
 	return resp, err
 }
 
 func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
-	r, err := ls.le.LeaseRevoke(ctx, rr)
+	resp, err := ls.le.LeaseRevoke(ctx, rr)
 	if err != nil {
 		return nil, rpctypes.ErrLeaseNotFound
 	}
-	return r, nil
+	ls.hdr.fill(resp.Header)
+	return resp, nil
 }
 
 func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
@@ -58,6 +61,15 @@ func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
 			return err
 		}
 
+		// Create header before we sent out the renew request.
+		// This can make sure that the revision is strictly smaller or equal to
+		// when the keepalive happened at the local server (when the local server is the leader)
+		// or remote leader.
+		// Without this, a lease might be revoked at rev 3 but client can see the keepalive succeeded
+		// at rev 4.
+		resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
+		ls.hdr.fill(resp.Header)
+
 		ttl, err := ls.le.LeaseRenew(lease.LeaseID(req.ID))
 		if err == lease.ErrLeaseNotFound {
 			err = nil
@@ -68,7 +80,7 @@ func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
 			return err
 		}
 
-		resp := &pb.LeaseKeepAliveResponse{ID: req.ID, TTL: ttl}
+		resp.TTL = ttl
 		err = stream.Send(resp)
 		if err != nil {
 			return err

+ 3 - 1
etcdserver/apply.go

@@ -406,13 +406,15 @@ func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantR
 	if err == nil {
 		resp.ID = int64(l.ID)
 		resp.TTL = l.TTL
+		resp.Header = &pb.ResponseHeader{Revision: a.s.KV().Rev()}
 	}
+
 	return resp, err
 }
 
 func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
 	err := a.s.lessor.Revoke(lease.LeaseID(lc.ID))
-	return &pb.LeaseRevokeResponse{}, err
+	return &pb.LeaseRevokeResponse{Header: &pb.ResponseHeader{Revision: a.s.KV().Rev()}}, err
 }
 
 func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {