Browse Source

Merge pull request #7743 from gyuho/shutdown-grpc-server

*: use gRPC server GracefulStop
Gyu-Ho Lee 8 years ago
parent
commit
e771c6042b

+ 8 - 0
embed/etcd.go

@@ -148,6 +148,14 @@ func (e *Etcd) Config() Config {
 func (e *Etcd) Close() {
 func (e *Etcd) Close() {
 	e.closeOnce.Do(func() { close(e.stopc) })
 	e.closeOnce.Do(func() { close(e.stopc) })
 
 
+	// (gRPC server) stops accepting new connections,
+	// RPCs, and blocks until all pending RPCs are finished
+	for _, sctx := range e.sctxs {
+		for gs := range sctx.grpcServerC {
+			gs.GracefulStop()
+		}
+	}
+
 	for _, sctx := range e.sctxs {
 	for _, sctx := range e.sctxs {
 		sctx.cancel()
 		sctx.cancel()
 	}
 	}

+ 8 - 1
embed/serve.go

@@ -52,11 +52,14 @@ type serveCtx struct {
 
 
 	userHandlers    map[string]http.Handler
 	userHandlers    map[string]http.Handler
 	serviceRegister func(*grpc.Server)
 	serviceRegister func(*grpc.Server)
+	grpcServerC     chan *grpc.Server
 }
 }
 
 
 func newServeCtx() *serveCtx {
 func newServeCtx() *serveCtx {
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx, cancel := context.WithCancel(context.Background())
-	return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler)}
+	return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler),
+		grpcServerC: make(chan *grpc.Server, 2), // in case sctx.insecure,sctx.secure true
+	}
 }
 }
 
 
 // serve accepts incoming connections on the listener l,
 // serve accepts incoming connections on the listener l,
@@ -72,8 +75,11 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
 	servElection := v3election.NewElectionServer(v3c)
 	servElection := v3election.NewElectionServer(v3c)
 	servLock := v3lock.NewLockServer(v3c)
 	servLock := v3lock.NewLockServer(v3c)
 
 
+	defer close(sctx.grpcServerC)
+
 	if sctx.insecure {
 	if sctx.insecure {
 		gs := v3rpc.Server(s, nil)
 		gs := v3rpc.Server(s, nil)
+		sctx.grpcServerC <- gs
 		v3electionpb.RegisterElectionServer(gs, servElection)
 		v3electionpb.RegisterElectionServer(gs, servElection)
 		v3lockpb.RegisterLockServer(gs, servLock)
 		v3lockpb.RegisterLockServer(gs, servLock)
 		if sctx.serviceRegister != nil {
 		if sctx.serviceRegister != nil {
@@ -103,6 +109,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
 
 
 	if sctx.secure {
 	if sctx.secure {
 		gs := v3rpc.Server(s, tlscfg)
 		gs := v3rpc.Server(s, tlscfg)
+		sctx.grpcServerC <- gs
 		v3electionpb.RegisterElectionServer(gs, servElection)
 		v3electionpb.RegisterElectionServer(gs, servElection)
 		v3lockpb.RegisterLockServer(gs, servLock)
 		v3lockpb.RegisterLockServer(gs, servLock)
 		if sctx.serviceRegister != nil {
 		if sctx.serviceRegister != nil {

+ 1 - 1
etcdmain/etcd.go

@@ -187,7 +187,7 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
 	if err != nil {
 	if err != nil {
 		return nil, nil, err
 		return nil, nil, err
 	}
 	}
-	osutil.RegisterInterruptHandler(e.Server.Stop)
+	osutil.RegisterInterruptHandler(e.Close)
 	select {
 	select {
 	case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
 	case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
 	case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
 	case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'

+ 1 - 1
integration/cluster.go

@@ -719,7 +719,7 @@ func (m *member) Close() {
 		m.serverClient = nil
 		m.serverClient = nil
 	}
 	}
 	if m.grpcServer != nil {
 	if m.grpcServer != nil {
-		m.grpcServer.Stop()
+		m.grpcServer.GracefulStop()
 		m.grpcServer = nil
 		m.grpcServer = nil
 	}
 	}
 	m.s.HardStop()
 	m.s.HardStop()

+ 35 - 20
integration/v3_maintenance_test.go → integration/v3_grpc_inflight_test.go

@@ -15,63 +15,78 @@
 package integration
 package integration
 
 
 import (
 import (
+	"sync"
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
-	"google.golang.org/grpc"
-
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/testutil"
+
 	"golang.org/x/net/context"
 	"golang.org/x/net/context"
+	"google.golang.org/grpc"
 )
 )
 
 
-// TestV3MaintenanceHashInflight ensures inflight Hash call
-// to embedded being-stopped EtcdServer does not trigger panic.
-func TestV3MaintenanceHashInflight(t *testing.T) {
+// TestV3MaintenanceDefragmentInflightRange ensures inflight range requests
+// does not panic the mvcc backend while defragment is running.
+func TestV3MaintenanceDefragmentInflightRange(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
 	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
 	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 
 
 	cli := clus.RandClient()
 	cli := clus.RandClient()
-	mvc := toGRPC(cli).Maintenance
+	kvc := toGRPC(cli).KV
+	if _, err := kvc.Put(context.Background(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
+		t.Fatal(err)
+	}
+
 	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 
 
 	donec := make(chan struct{})
 	donec := make(chan struct{})
 	go func() {
 	go func() {
 		defer close(donec)
 		defer close(donec)
-		mvc.Hash(ctx, &pb.HashRequest{}, grpc.FailFast(false))
+		kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo")})
 	}()
 	}()
 
 
-	clus.Members[0].s.HardStop()
+	mvc := toGRPC(cli).Maintenance
+	mvc.Defragment(context.Background(), &pb.DefragmentRequest{})
 	cancel()
 	cancel()
 
 
 	<-donec
 	<-donec
 }
 }
 
 
-// TestV3MaintenanceDefragmentInflightRange ensures inflight range requests
-// does not panic the mvcc backend while defragment is running.
-func TestV3MaintenanceDefragmentInflightRange(t *testing.T) {
+// TestV3KVInflightRangeRequests ensures that inflight requests
+// (sent before server shutdown) are gracefully handled by server-side.
+// They are either finished or canceled, but never crash the backend.
+// See https://github.com/coreos/etcd/issues/7322 for more detail.
+func TestV3KVInflightRangeRequests(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
 	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
 	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 
 
 	cli := clus.RandClient()
 	cli := clus.RandClient()
 	kvc := toGRPC(cli).KV
 	kvc := toGRPC(cli).KV
+
 	if _, err := kvc.Put(context.Background(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
 	if _, err := kvc.Put(context.Background(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
-	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
 
 
-	donec := make(chan struct{})
-	go func() {
-		defer close(donec)
-		kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo")})
-	}()
+	reqN := 10 // use 500+ for fast machine
+	var wg sync.WaitGroup
+	wg.Add(reqN)
+	for i := 0; i < reqN; i++ {
+		go func() {
+			defer wg.Done()
+			_, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo"), Serializable: true}, grpc.FailFast(false))
+			if err != nil && grpc.ErrorDesc(err) != context.Canceled.Error() {
+				t.Fatalf("inflight request should be canceld with %v, got %v", context.Canceled, err)
+			}
+		}()
+	}
 
 
-	mvc := toGRPC(cli).Maintenance
-	mvc.Defragment(context.Background(), &pb.DefragmentRequest{})
+	clus.Members[0].Stop(t)
 	cancel()
 	cancel()
 
 
-	<-donec
+	wg.Wait()
 }
 }

+ 5 - 13
mvcc/backend/batch_tx.go

@@ -166,19 +166,11 @@ func (t *batchTx) commit(stop bool) {
 			t.backend.mu.RLock()
 			t.backend.mu.RLock()
 			defer t.backend.mu.RUnlock()
 			defer t.backend.mu.RUnlock()
 
 
-			// batchTx.commit(true) calls *bolt.Tx.Commit, which
-			// initializes *bolt.Tx.db and *bolt.Tx.meta as nil,
-			// and subsequent *bolt.Tx.Size() call panics.
-			//
-			// This nil pointer reference panic happens when:
-			//   1. batchTx.commit(false) from newBatchTx
-			//   2. batchTx.commit(true) from stopping backend
-			//   3. batchTx.commit(false) from inflight mvcc Hash call
-			//
-			// Check if db is nil to prevent this panic
-			if t.tx.DB() != nil {
-				atomic.StoreInt64(&t.backend.size, t.tx.Size())
-			}
+			// t.tx.DB()==nil if 'CommitAndStop' calls 'batchTx.commit(true)',
+			// which initializes *bolt.Tx.db and *bolt.Tx.meta as nil; panics t.tx.Size().
+			// Server must make sure 'batchTx.commit(false)' does not follow
+			// 'batchTx.commit(true)' (e.g. stopping backend, and inflight Hash call).
+			atomic.StoreInt64(&t.backend.size, t.tx.Size())
 			return
 			return
 		}
 		}
 
 

+ 0 - 7
mvcc/kvstore.go

@@ -146,13 +146,6 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
 }
 }
 
 
 func (s *store) Hash() (hash uint32, revision int64, err error) {
 func (s *store) Hash() (hash uint32, revision int64, err error) {
-	// TODO: nothing should be able to call into backend when closed
-	select {
-	case <-s.stopc:
-		return 0, 0, ErrClosed
-	default:
-	}
-
 	s.b.ForceCommit()
 	s.b.ForceCommit()
 	h, err := s.b.Hash(DefaultIgnores)
 	h, err := s.b.Hash(DefaultIgnores)
 	return h, s.currentRev, err
 	return h, s.currentRev, err

+ 0 - 14
mvcc/kvstore_test.go

@@ -518,20 +518,6 @@ func newTestKeyBytes(rev revision, tombstone bool) []byte {
 	return bytes
 	return bytes
 }
 }
 
 
-// TestStoreHashAfterForceCommit ensures that later Hash call to
-// closed backend with ForceCommit does not panic.
-func TestStoreHashAfterForceCommit(t *testing.T) {
-	be, tmpPath := backend.NewDefaultTmpBackend()
-	kv := NewStore(be, &lease.FakeLessor{}, nil)
-	defer os.Remove(tmpPath)
-
-	// as in EtcdServer.HardStop
-	kv.Close()
-	be.Close()
-
-	kv.Hash()
-}
-
 func newFakeStore() *store {
 func newFakeStore() *store {
 	b := &fakeBackend{&fakeBatchTx{
 	b := &fakeBackend{&fakeBatchTx{
 		Recorder:   &testutil.RecorderBuffered{},
 		Recorder:   &testutil.RecorderBuffered{},