Browse Source

Merge pull request #7529 from fanminshi/fix_closing_embedded_error

embed: don't return error when closing on embed etcd
fanmin shi 8 years ago
parent
commit
fba87558a6
3 changed files with 30 additions and 7 deletions
  1. 21 3
      embed/etcd.go
  2. 4 4
      embed/serve.go
  3. 5 0
      integration/embed_test.go

+ 21 - 3
embed/etcd.go

@@ -20,6 +20,7 @@ import (
 	"net"
 	"net/http"
 	"path/filepath"
+	"sync"
 
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/api/v2http"
@@ -54,8 +55,11 @@ type Etcd struct {
 	Server  *etcdserver.EtcdServer
 
 	cfg   Config
+	stopc chan struct{}
 	errc  chan error
 	sctxs map[string]*serveCtx
+
+	closeOnce sync.Once
 }
 
 // StartEtcd launches the etcd server and HTTP handlers for client/server communication.
@@ -65,7 +69,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
 	if err = inCfg.Validate(); err != nil {
 		return nil, err
 	}
-	e = &Etcd{cfg: *inCfg}
+	e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
 	cfg := &e.cfg
 	defer func() {
 		if e != nil && err != nil {
@@ -141,6 +145,8 @@ func (e *Etcd) Config() Config {
 }
 
 func (e *Etcd) Close() {
+	e.closeOnce.Do(func() { close(e.stopc) })
+
 	for _, sctx := range e.sctxs {
 		sctx.cancel()
 	}
@@ -319,7 +325,7 @@ func (e *Etcd) serve() (err error) {
 	ph := v2http.NewPeerHandler(e.Server)
 	for _, l := range e.Peers {
 		go func(l net.Listener) {
-			e.errc <- servePeerHTTP(l, ph)
+			e.errHandler(servePeerHTTP(l, ph))
 		}(l)
 	}
 
@@ -335,8 +341,20 @@ func (e *Etcd) serve() (err error) {
 		// read timeout does not work with http close notify
 		// TODO: https://github.com/golang/go/issues/9524
 		go func(s *serveCtx) {
-			e.errc <- s.serve(e.Server, ctlscfg, v2h, e.errc)
+			e.errHandler(s.serve(e.Server, ctlscfg, v2h, e.errHandler))
 		}(sctx)
 	}
 	return nil
 }
+
+func (e *Etcd) errHandler(err error) {
+	select {
+	case <-e.stopc:
+		return
+	default:
+	}
+	select {
+	case <-e.stopc:
+	case e.errc <- err:
+	}
+}

+ 4 - 4
embed/serve.go

@@ -62,7 +62,7 @@ func newServeCtx() *serveCtx {
 // serve accepts incoming connections on the listener l,
 // creating a new service goroutine for each. The service goroutines
 // read requests and then call handler to reply to them.
-func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler, errc chan<- error) error {
+func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler, errHandler func(error)) error {
 	logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
 	<-s.ReadyNotify()
 	plog.Info("ready to serve client requests")
@@ -76,7 +76,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
 			sctx.serviceRegister(gs)
 		}
 		grpcl := m.Match(cmux.HTTP2())
-		go func() { errc <- gs.Serve(grpcl) }()
+		go func() { errHandler(gs.Serve(grpcl)) }()
 
 		opts := []grpc.DialOption{
 			grpc.WithInsecure(),
@@ -93,7 +93,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
 			ErrorLog: logger, // do not log user error
 		}
 		httpl := m.Match(cmux.HTTP1())
-		go func() { errc <- srvhttp.Serve(httpl) }()
+		go func() { errHandler(srvhttp.Serve(httpl)) }()
 		plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.l.Addr().String())
 	}
 
@@ -124,7 +124,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
 			TLSConfig: tlscfg,
 			ErrorLog:  logger, // do not log user error
 		}
-		go func() { errc <- srv.Serve(tlsl) }()
+		go func() { errHandler(srv.Serve(tlsl)) }()
 
 		plog.Infof("serving client requests on %s", sctx.l.Addr().String())
 	}

+ 5 - 0
integration/embed_test.go

@@ -94,6 +94,11 @@ func TestEmbedEtcd(t *testing.T) {
 			t.Errorf("%d: expected %d clients, got %d", i, tt.wclients, len(e.Clients))
 		}
 		e.Close()
+		select {
+		case err := <-e.Err():
+			t.Errorf("#%d: unexpected error on close (%v)", i, err)
+		default:
+		}
 	}
 }