Ver Fonte

v3rpc: return leader loss error if lease stream is canceled

Canceling the stream won't cancel the receive since it's using the internal
grpc context, not the one assigned by etcd.
Anthony Romano há 8 anos atrás
pai
commit
833769f59f
1 ficheiros alterados com 19 adições e 1 exclusões
  1. 19 1
      etcdserver/api/v3rpc/lease.go

+ 19 - 1
etcdserver/api/v3rpc/lease.go

@@ -18,6 +18,7 @@ import (
 	"io"
 
 	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/lease"
 	"golang.org/x/net/context"
@@ -67,7 +68,24 @@ func (ls *LeaseServer) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLi
 	return resp, nil
 }
 
-func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
+func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) (err error) {
+	errc := make(chan error, 1)
+	go func() {
+		errc <- ls.leaseKeepAlive(stream)
+	}()
+	select {
+	case err = <-errc:
+	case <-stream.Context().Done():
+		// the only server-side cancellation is noleader for now.
+		err = stream.Context().Err()
+		if err == context.Canceled {
+			err = rpctypes.ErrGRPCNoLeader
+		}
+	}
+	return err
+}
+
+func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
 	for {
 		req, err := stream.Recv()
 		if err == io.EOF {