Browse Source

Merge pull request #8898 from gyuho/z2

etcdserver,embed: clean up/reorganize client/peer/corrupt handlers
Gyu-Ho Lee 8 years ago
parent
commit
fb9e78ff3e
2 changed files with 99 additions and 69 deletions
  1. 57 41
      embed/etcd.go
  2. 42 28
      etcdserver/corrupt.go

+ 57 - 41
embed/etcd.go

@@ -39,8 +39,8 @@ import (
 	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/rafthttp"
 	"github.com/coreos/etcd/rafthttp"
-	"github.com/coreos/pkg/capnslog"
 
 
+	"github.com/coreos/pkg/capnslog"
 	"github.com/soheilhy/cmux"
 	"github.com/soheilhy/cmux"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/keepalive"
 	"google.golang.org/grpc/keepalive"
@@ -64,15 +64,17 @@ const (
 
 
 // Etcd contains a running etcd server and its listeners.
 // Etcd contains a running etcd server and its listeners.
 type Etcd struct {
 type Etcd struct {
-	Peers            []*peerListener
-	Clients          []net.Listener
+	Peers   []*peerListener
+	Clients []net.Listener
+	// a map of contexts for the servers that serves client requests.
+	sctxs            map[string]*serveCtx
 	metricsListeners []net.Listener
 	metricsListeners []net.Listener
-	Server           *etcdserver.EtcdServer
+
+	Server *etcdserver.EtcdServer
 
 
 	cfg   Config
 	cfg   Config
 	stopc chan struct{}
 	stopc chan struct{}
 	errc  chan error
 	errc  chan error
-	sctxs map[string]*serveCtx
 
 
 	closeOnce sync.Once
 	closeOnce sync.Once
 }
 }
@@ -185,37 +187,16 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
 
 
 	e.Server.Start()
 	e.Server.Start()
 
 
-	// configure peer handlers after rafthttp.Transport started
-	ph := etcdhttp.NewPeerHandler(e.Server)
-	var peerTLScfg *tls.Config
-	if !cfg.PeerTLSInfo.Empty() {
-		if peerTLScfg, err = cfg.PeerTLSInfo.ServerConfig(); err != nil {
-			return e, err
-		}
+	if err = e.servePeers(); err != nil {
+		return e, err
 	}
 	}
-	for _, p := range e.Peers {
-		gs := v3rpc.Server(e.Server, peerTLScfg)
-		m := cmux.New(p.Listener)
-		go gs.Serve(m.Match(cmux.HTTP2()))
-		srv := &http.Server{
-			Handler:     grpcHandlerFunc(gs, ph),
-			ReadTimeout: 5 * time.Minute,
-			ErrorLog:    defaultLog.New(ioutil.Discard, "", 0), // do not log user error
-		}
-		go srv.Serve(m.Match(cmux.Any()))
-		p.serve = func() error { return m.Serve() }
-		p.close = func(ctx context.Context) error {
-			// gracefully shutdown http.Server
-			// close open listeners, idle connections
-			// until context cancel or time-out
-			e.stopGRPCServer(gs)
-			return srv.Shutdown(ctx)
-		}
+	if err = e.serveClients(); err != nil {
+		return e, err
 	}
 	}
-
-	if err = e.serve(); err != nil {
+	if err = e.serveMetrics(); err != nil {
 		return e, err
 		return e, err
 	}
 	}
+
 	serving = true
 	serving = true
 	return e, nil
 	return e, nil
 }
 }
@@ -331,6 +312,44 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
 	return peers, nil
 	return peers, nil
 }
 }
 
 
+// configure peer handlers after rafthttp.Transport started
+func (e *Etcd) servePeers() (err error) {
+	ph := etcdhttp.NewPeerHandler(e.Server)
+	var peerTLScfg *tls.Config
+	if !e.cfg.PeerTLSInfo.Empty() {
+		if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil {
+			return err
+		}
+	}
+	for _, p := range e.Peers {
+		gs := v3rpc.Server(e.Server, peerTLScfg)
+		m := cmux.New(p.Listener)
+		go gs.Serve(m.Match(cmux.HTTP2()))
+		srv := &http.Server{
+			Handler:     grpcHandlerFunc(gs, ph),
+			ReadTimeout: 5 * time.Minute,
+			ErrorLog:    defaultLog.New(ioutil.Discard, "", 0), // do not log user error
+		}
+		go srv.Serve(m.Match(cmux.Any()))
+		p.serve = func() error { return m.Serve() }
+		p.close = func(ctx context.Context) error {
+			// gracefully shutdown http.Server
+			// close open listeners, idle connections
+			// until context cancel or time-out
+			e.stopGRPCServer(gs)
+			return srv.Shutdown(ctx)
+		}
+	}
+
+	// start peer servers in a goroutine
+	for _, pl := range e.Peers {
+		go func(l *peerListener) {
+			e.errHandler(l.serve())
+		}(pl)
+	}
+	return nil
+}
+
 func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
 func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
 	if err = cfg.ClientSelfCert(); err != nil {
 	if err = cfg.ClientSelfCert(); err != nil {
 		plog.Fatalf("could not get certs (%v)", err)
 		plog.Fatalf("could not get certs (%v)", err)
@@ -412,7 +431,7 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
 	return sctxs, nil
 	return sctxs, nil
 }
 }
 
 
-func (e *Etcd) serve() (err error) {
+func (e *Etcd) serveClients() (err error) {
 	if !e.cfg.ClientTLSInfo.Empty() {
 	if !e.cfg.ClientTLSInfo.Empty() {
 		plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo)
 		plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo)
 	}
 	}
@@ -421,13 +440,6 @@ func (e *Etcd) serve() (err error) {
 		plog.Infof("cors = %s", e.cfg.CorsInfo)
 		plog.Infof("cors = %s", e.cfg.CorsInfo)
 	}
 	}
 
 
-	// Start the peer server in a goroutine
-	for _, pl := range e.Peers {
-		go func(l *peerListener) {
-			e.errHandler(l.serve())
-		}(pl)
-	}
-
 	// Start a client server goroutine for each listen address
 	// Start a client server goroutine for each listen address
 	var h http.Handler
 	var h http.Handler
 	if e.Config().EnableV2 {
 	if e.Config().EnableV2 {
@@ -458,12 +470,17 @@ func (e *Etcd) serve() (err error) {
 			Timeout: e.cfg.GRPCKeepAliveTimeout,
 			Timeout: e.cfg.GRPCKeepAliveTimeout,
 		}))
 		}))
 	}
 	}
+
+	// start client servers in a goroutine
 	for _, sctx := range e.sctxs {
 	for _, sctx := range e.sctxs {
 		go func(s *serveCtx) {
 		go func(s *serveCtx) {
 			e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...))
 			e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...))
 		}(sctx)
 		}(sctx)
 	}
 	}
+	return nil
+}
 
 
+func (e *Etcd) serveMetrics() (err error) {
 	if len(e.cfg.ListenMetricsUrls) > 0 {
 	if len(e.cfg.ListenMetricsUrls) > 0 {
 		metricsMux := http.NewServeMux()
 		metricsMux := http.NewServeMux()
 		etcdhttp.HandleMetricsHealth(metricsMux, e.Server)
 		etcdhttp.HandleMetricsHealth(metricsMux, e.Server)
@@ -484,7 +501,6 @@ func (e *Etcd) serve() (err error) {
 			}(murl, ml)
 			}(murl, ml)
 		}
 		}
 	}
 	}
-
 	return nil
 	return nil
 }
 }
 
 

+ 42 - 28
etcdserver/corrupt.go

@@ -50,34 +50,7 @@ func (s *EtcdServer) checkHashKV() error {
 	if err != nil {
 	if err != nil {
 		plog.Fatalf("failed to hash kv store (%v)", err)
 		plog.Fatalf("failed to hash kv store (%v)", err)
 	}
 	}
-	resps := []*clientv3.HashKVResponse{}
-	for _, m := range s.cluster.Members() {
-		if m.ID == s.ID() {
-			continue
-		}
-
-		cli, cerr := clientv3.New(clientv3.Config{Endpoints: m.PeerURLs})
-		if cerr != nil {
-			continue
-		}
-
-		respsLen := len(resps)
-		for _, c := range cli.Endpoints() {
-			ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
-			resp, herr := cli.HashKV(ctx, c, rev)
-			cancel()
-			if herr == nil {
-				cerr = herr
-				resps = append(resps, resp)
-				break
-			}
-		}
-		cli.Close()
-
-		if respsLen == len(resps) {
-			plog.Warningf("failed to hash kv for peer %s (%v)", types.ID(m.ID), cerr)
-		}
-	}
+	resps := s.getPeerHashKVs(rev)
 
 
 	ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
 	ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
 	err = s.linearizableReadNotify(ctx)
 	err = s.linearizableReadNotify(ctx)
@@ -115,6 +88,8 @@ func (s *EtcdServer) checkHashKV() error {
 
 
 	for _, resp := range resps {
 	for _, resp := range resps {
 		id := resp.Header.MemberId
 		id := resp.Header.MemberId
+
+		// leader expects follower's latest revision less than or equal to leader's
 		if resp.Header.Revision > rev2 {
 		if resp.Header.Revision > rev2 {
 			plog.Warningf(
 			plog.Warningf(
 				"revision %d from member %v, expected at most %d",
 				"revision %d from member %v, expected at most %d",
@@ -123,6 +98,8 @@ func (s *EtcdServer) checkHashKV() error {
 				rev2)
 				rev2)
 			mismatch(id)
 			mismatch(id)
 		}
 		}
+
+		// leader expects follower's latest compact revision less than or equal to leader's
 		if resp.CompactRevision > crev2 {
 		if resp.CompactRevision > crev2 {
 			plog.Warningf(
 			plog.Warningf(
 				"compact revision %d from member %v, expected at most %d",
 				"compact revision %d from member %v, expected at most %d",
@@ -132,6 +109,8 @@ func (s *EtcdServer) checkHashKV() error {
 			)
 			)
 			mismatch(id)
 			mismatch(id)
 		}
 		}
+
+		// follower's compact revision is leader's old one, then hashes must match
 		if resp.CompactRevision == crev && resp.Hash != h {
 		if resp.CompactRevision == crev && resp.Hash != h {
 			plog.Warningf(
 			plog.Warningf(
 				"hash %d at revision %d from member %v, expected hash %d",
 				"hash %d at revision %d from member %v, expected hash %d",
@@ -146,6 +125,41 @@ func (s *EtcdServer) checkHashKV() error {
 	return nil
 	return nil
 }
 }
 
 
+func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*clientv3.HashKVResponse) {
+	for _, m := range s.cluster.Members() {
+		if m.ID == s.ID() {
+			continue
+		}
+
+		cli, cerr := clientv3.New(clientv3.Config{
+			DialTimeout: s.Cfg.ReqTimeout(),
+			Endpoints:   m.PeerURLs,
+		})
+		if cerr != nil {
+			plog.Warningf("%s failed to create client to peer %s for hash checking (%q)", s.ID(), types.ID(m.ID), cerr.Error())
+			continue
+		}
+
+		respsLen := len(resps)
+		for _, c := range cli.Endpoints() {
+			ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
+			resp, herr := cli.HashKV(ctx, c, rev)
+			cancel()
+			if herr == nil {
+				cerr = herr
+				resps = append(resps, resp)
+				break
+			}
+		}
+		cli.Close()
+
+		if respsLen == len(resps) {
+			plog.Warningf("%s failed to hash kv for peer %s (%v)", s.ID(), types.ID(m.ID), cerr)
+		}
+	}
+	return resps
+}
+
 type applierV3Corrupt struct {
 type applierV3Corrupt struct {
 	applierV3
 	applierV3
 }
 }