Преглед изворни кода

grpcproxy: add lease coalescing support

fanmin shi пре 8 година
родитељ
комит
ba52bd07ba
2 измењених фајлова са 410 додато и 33 уклоњено
  1. 313 33
      proxy/grpcproxy/lease.go
  2. 97 0
      proxy/grpcproxy/lease_client_adapter.go

+ 313 - 33
proxy/grpcproxy/lease.go

@@ -15,73 +15,353 @@
 package grpcproxy
 
 import (
+	"io"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/metadata"
+
 	"golang.org/x/net/context"
 
 	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 )
 
 type leaseProxy struct {
-	client *clientv3.Client
+	// leaseClient handles req from LeaseGrant() that requires a lease ID.
+	leaseClient pb.LeaseClient
+
+	lessor clientv3.Lease
+
+	ctx context.Context
+
+	leader *leader
+
+	// mu protects adding outstanding leaseProxyStream through wg.
+	mu sync.RWMutex
+
+	// wg waits until all outstanding leaseProxyStream quit.
+	wg sync.WaitGroup
 }
 
-func NewLeaseProxy(c *clientv3.Client) pb.LeaseServer {
-	return &leaseProxy{
-		client: c,
+func NewLeaseProxy(c *clientv3.Client) (pb.LeaseServer, <-chan struct{}) {
+	cctx, cancel := context.WithCancel(c.Ctx())
+	lp := &leaseProxy{
+		leaseClient: pb.NewLeaseClient(c.ActiveConnection()),
+		lessor:      c.Lease,
+		ctx:         cctx,
+		leader:      newLeader(c.Ctx(), c.Watcher),
 	}
+	ch := make(chan struct{})
+	go func() {
+		defer close(ch)
+		<-lp.leader.stopNotify()
+		lp.mu.Lock()
+		select {
+		case <-lp.ctx.Done():
+		case <-lp.leader.disconnectNotify():
+			cancel()
+		}
+		<-lp.ctx.Done()
+		lp.mu.Unlock()
+		lp.wg.Wait()
+	}()
+	return lp, ch
 }
 
 func (lp *leaseProxy) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
-	conn := lp.client.ActiveConnection()
-	return pb.NewLeaseClient(conn).LeaseGrant(ctx, cr)
+	rp, err := lp.leaseClient.LeaseGrant(ctx, cr)
+	if err != nil {
+		return nil, err
+	}
+	lp.leader.gotLeader()
+	return rp, nil
 }
 
 func (lp *leaseProxy) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
-	conn := lp.client.ActiveConnection()
-	return pb.NewLeaseClient(conn).LeaseRevoke(ctx, rr)
+	r, err := lp.lessor.Revoke(ctx, clientv3.LeaseID(rr.ID))
+	if err != nil {
+		return nil, err
+	}
+	lp.leader.gotLeader()
+	return (*pb.LeaseRevokeResponse)(r), nil
 }
 
 func (lp *leaseProxy) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
-	conn := lp.client.ActiveConnection()
-	return pb.NewLeaseClient(conn).LeaseTimeToLive(ctx, rr)
+	var (
+		r   *clientv3.LeaseTimeToLiveResponse
+		err error
+	)
+	if rr.Keys {
+		r, err = lp.lessor.TimeToLive(ctx, clientv3.LeaseID(rr.ID), clientv3.WithAttachedKeys())
+	} else {
+		r, err = lp.lessor.TimeToLive(ctx, clientv3.LeaseID(rr.ID))
+	}
+	if err != nil {
+		return nil, err
+	}
+	rp := &pb.LeaseTimeToLiveResponse{
+		Header:     r.ResponseHeader,
+		ID:         int64(r.ID),
+		TTL:        r.TTL,
+		GrantedTTL: r.GrantedTTL,
+		Keys:       r.Keys,
+	}
+	return rp, err
 }
 
 func (lp *leaseProxy) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
-	conn := lp.client.ActiveConnection()
+	lp.mu.Lock()
+	select {
+	case <-lp.ctx.Done():
+		lp.mu.Unlock()
+		return lp.ctx.Err()
+	default:
+		lp.wg.Add(1)
+	}
+	lp.mu.Unlock()
+
 	ctx, cancel := context.WithCancel(stream.Context())
-	lc, err := pb.NewLeaseClient(conn).LeaseKeepAlive(ctx)
-	if err != nil {
-		cancel()
-		return err
+	lps := leaseProxyStream{
+		stream:          stream,
+		lessor:          lp.lessor,
+		keepAliveLeases: make(map[int64]*atomicCounter),
+		respc:           make(chan *pb.LeaseKeepAliveResponse),
+		ctx:             ctx,
+		cancel:          cancel,
 	}
 
-	go func() {
-		// Cancel the context attached to lc to unblock lc.Recv when
-		// this routine returns on error.
-		defer cancel()
-
-		for {
-			// stream.Recv will be unblock when the loop in the parent routine
-			// returns on error.
-			rr, err := stream.Recv()
-			if err != nil {
-				return
-			}
-			err = lc.Send(rr)
-			if err != nil {
-				return
+	errc := make(chan error, 2)
+
+	var lostLeaderC <-chan struct{}
+	if md, ok := metadata.FromContext(stream.Context()); ok {
+		v := md[rpctypes.MetadataRequireLeaderKey]
+		if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
+			lostLeaderC = lp.leader.lostNotify()
+			// if leader is known to be lost at creation time, avoid
+			// letting events through at all
+			select {
+			case <-lostLeaderC:
+				lp.wg.Done()
+				return rpctypes.ErrNoLeader
+			default:
 			}
 		}
+	}
+	stopc := make(chan struct{}, 3)
+	go func() {
+		defer func() { stopc <- struct{}{} }()
+		if err := lps.recvLoop(); err != nil {
+			errc <- err
+		}
 	}()
 
-	for {
-		rr, err := lc.Recv()
+	go func() {
+		defer func() { stopc <- struct{}{} }()
+		if err := lps.sendLoop(); err != nil {
+			errc <- err
+		}
+	}()
+
+	// tears down LeaseKeepAlive stream if leader goes down or entire leaseProxy is terminated.
+	go func() {
+		defer func() { stopc <- struct{}{} }()
+		select {
+		case <-lostLeaderC:
+		case <-ctx.Done():
+		case <-lp.ctx.Done():
+		}
+	}()
+
+	var err error
+	select {
+	case <-stopc:
+		stopc <- struct{}{}
+	case err = <-errc:
+	}
+	cancel()
+
+	// recv/send may only shutdown after function exits;
+	// this goroutine notifies lease proxy that the stream is through
+	go func() {
+		<-stopc
+		<-stopc
+		<-stopc
+		lps.close()
+		close(errc)
+		lp.wg.Done()
+	}()
+
+	select {
+	case <-lostLeaderC:
+		return rpctypes.ErrNoLeader
+	case <-lp.leader.disconnectNotify():
+		return grpc.ErrClientConnClosing
+	default:
 		if err != nil {
 			return err
 		}
-		err = stream.Send(rr)
+		return ctx.Err()
+	}
+}
+
+type leaseProxyStream struct {
+	stream pb.Lease_LeaseKeepAliveServer
+
+	lessor clientv3.Lease
+	// wg tracks keepAliveLoop goroutines
+	wg sync.WaitGroup
+	// mu protects keepAliveLeases
+	mu sync.RWMutex
+	// keepAliveLeases tracks how many outstanding keepalive requests which need responses are on a lease.
+	keepAliveLeases map[int64]*atomicCounter
+	// respc receives lease keepalive responses from etcd backend
+	respc chan *pb.LeaseKeepAliveResponse
+
+	ctx    context.Context
+	cancel context.CancelFunc
+}
+
+func (lps *leaseProxyStream) recvLoop() error {
+	for {
+		rr, err := lps.stream.Recv()
+		if err == io.EOF {
+			return nil
+		}
 		if err != nil {
 			return err
 		}
+		lps.mu.Lock()
+		neededResps, ok := lps.keepAliveLeases[rr.ID]
+		if !ok {
+			neededResps = &atomicCounter{}
+			lps.keepAliveLeases[rr.ID] = neededResps
+			lps.wg.Add(1)
+			go func() {
+				defer lps.wg.Done()
+				if err := lps.keepAliveLoop(rr.ID, neededResps); err != nil {
+					lps.cancel()
+				}
+			}()
+		}
+		neededResps.add(1)
+		lps.mu.Unlock()
 	}
 }
+
+func (lps *leaseProxyStream) keepAliveLoop(leaseID int64, neededResps *atomicCounter) error {
+	cctx, ccancel := context.WithCancel(lps.ctx)
+	defer ccancel()
+	respc, err := lps.lessor.KeepAlive(cctx, clientv3.LeaseID(leaseID))
+	if err != nil {
+		return err
+	}
+	// ticker expires when loop hasn't received keepalive within TTL
+	var ticker <-chan time.Time
+	for {
+		select {
+		case <-ticker:
+			lps.mu.Lock()
+			// if there are outstanding keepAlive reqs at the moment of ticker firing,
+			// don't close keepAliveLoop(), let it continuing to process the KeepAlive reqs.
+			if neededResps.get() > 0 {
+				lps.mu.Unlock()
+				ticker = nil
+				continue
+			}
+			delete(lps.keepAliveLeases, leaseID)
+			lps.mu.Unlock()
+			return nil
+		case rp, ok := <-respc:
+			if !ok {
+				lps.mu.Lock()
+				delete(lps.keepAliveLeases, leaseID)
+				lps.mu.Unlock()
+				if neededResps.get() == 0 {
+					return nil
+				}
+				ttlResp, err := lps.lessor.TimeToLive(cctx, clientv3.LeaseID(leaseID))
+				if err != nil {
+					return err
+				}
+				r := &pb.LeaseKeepAliveResponse{
+					Header: ttlResp.ResponseHeader,
+					ID:     int64(ttlResp.ID),
+					TTL:    ttlResp.TTL,
+				}
+				for neededResps.get() > 0 {
+					select {
+					case lps.respc <- r:
+						neededResps.add(-1)
+					case <-lps.ctx.Done():
+						return nil
+					}
+				}
+				return nil
+			}
+			if neededResps.get() == 0 {
+				continue
+			}
+			ticker = time.After(time.Duration(rp.TTL) * time.Second)
+			r := &pb.LeaseKeepAliveResponse{
+				Header: rp.ResponseHeader,
+				ID:     int64(rp.ID),
+				TTL:    rp.TTL,
+			}
+			lps.replyToClient(r, neededResps)
+		}
+	}
+}
+
+func (lps *leaseProxyStream) replyToClient(r *pb.LeaseKeepAliveResponse, neededResps *atomicCounter) {
+	timer := time.After(500 * time.Millisecond)
+	for neededResps.get() > 0 {
+		select {
+		case lps.respc <- r:
+			neededResps.add(-1)
+		case <-timer:
+			return
+		case <-lps.ctx.Done():
+			return
+		}
+	}
+}
+
+func (lps *leaseProxyStream) sendLoop() error {
+	for {
+		select {
+		case lrp, ok := <-lps.respc:
+			if !ok {
+				return nil
+			}
+			if err := lps.stream.Send(lrp); err != nil {
+				return err
+			}
+		case <-lps.ctx.Done():
+			return lps.ctx.Err()
+		}
+	}
+}
+
+func (lps *leaseProxyStream) close() {
+	lps.cancel()
+	lps.wg.Wait()
+	// only close respc channel if all the keepAliveLoop() goroutines have finished
+	// this ensures those goroutines don't send resp to a closed resp channel
+	close(lps.respc)
+}
+
+type atomicCounter struct {
+	counter int64
+}
+
+func (ac *atomicCounter) add(delta int64) {
+	atomic.AddInt64(&ac.counter, delta)
+}
+
+func (ac *atomicCounter) get() int64 {
+	return atomic.LoadInt64(&ac.counter)
+}

+ 97 - 0
proxy/grpcproxy/lease_client_adapter.go

@@ -0,0 +1,97 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	"golang.org/x/net/context"
+
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/metadata"
+)
+
+type ls2lc struct {
+	leaseServer pb.LeaseServer
+}
+
+func LeaseServerToLeaseClient(ls pb.LeaseServer) pb.LeaseClient {
+	return &ls2lc{ls}
+}
+
+func (c *ls2lc) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (*pb.LeaseGrantResponse, error) {
+	return c.leaseServer.LeaseGrant(ctx, in)
+}
+
+func (c *ls2lc) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts ...grpc.CallOption) (*pb.LeaseRevokeResponse, error) {
+	return c.leaseServer.LeaseRevoke(ctx, in)
+}
+
+func (c *ls2lc) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (pb.Lease_LeaseKeepAliveClient, error) {
+	// ch1 is buffered so server can send error on close
+	ch1, ch2 := make(chan interface{}, 1), make(chan interface{})
+	headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
+
+	cctx, ccancel := context.WithCancel(ctx)
+	cli := &chanStream{recvc: ch1, sendc: ch2, ctx: cctx, cancel: ccancel}
+	lclient := &ls2lcClientStream{chanClientStream{headerc, trailerc, cli}}
+
+	sctx, scancel := context.WithCancel(ctx)
+	srv := &chanStream{recvc: ch2, sendc: ch1, ctx: sctx, cancel: scancel}
+	lserver := &ls2lcServerStream{chanServerStream{headerc, trailerc, srv, nil}}
+	go func() {
+		if err := c.leaseServer.LeaseKeepAlive(lserver); err != nil {
+			select {
+			case srv.sendc <- err:
+			case <-sctx.Done():
+			case <-cctx.Done():
+			}
+		}
+		scancel()
+		ccancel()
+	}()
+	return lclient, nil
+}
+
+func (c *ls2lc) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (*pb.LeaseTimeToLiveResponse, error) {
+	return c.leaseServer.LeaseTimeToLive(ctx, in)
+}
+
+// ls2lcClientStream implements Lease_LeaseKeepAliveClient
+type ls2lcClientStream struct{ chanClientStream }
+
+// ls2lcServerStream implements Lease_LeaseKeepAliveServer
+type ls2lcServerStream struct{ chanServerStream }
+
+func (s *ls2lcClientStream) Send(rr *pb.LeaseKeepAliveRequest) error {
+	return s.SendMsg(rr)
+}
+func (s *ls2lcClientStream) Recv() (*pb.LeaseKeepAliveResponse, error) {
+	var v interface{}
+	if err := s.RecvMsg(&v); err != nil {
+		return nil, err
+	}
+	return v.(*pb.LeaseKeepAliveResponse), nil
+}
+
+func (s *ls2lcServerStream) Send(rr *pb.LeaseKeepAliveResponse) error {
+	return s.SendMsg(rr)
+}
+func (s *ls2lcServerStream) Recv() (*pb.LeaseKeepAliveRequest, error) {
+	var v interface{}
+	if err := s.RecvMsg(&v); err != nil {
+		return nil, err
+	}
+	return v.(*pb.LeaseKeepAliveRequest), nil
+}