Browse Source

functional/agent: support embedded etcd

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 7 years ago
parent
commit
677894b4fa
2 changed files with 175 additions and 157 deletions
  1. 175 134
      functional/agent/handler.go
  2. 0 23
      functional/agent/utils.go

+ 175 - 134
functional/agent/handler.go

@@ -85,52 +85,140 @@ func (srv *Server) handleTesterRequest(req *rpcpb.Request) (resp *rpcpb.Response
 	}
 	}
 }
 }
 
 
-func (srv *Server) handle_INITIAL_START_ETCD(req *rpcpb.Request) (*rpcpb.Response, error) {
-	if srv.last != rpcpb.Operation_NOT_STARTED {
-		return &rpcpb.Response{
-			Success: false,
-			Status:  fmt.Sprintf("%q is not valid; last server operation was %q", rpcpb.Operation_INITIAL_START_ETCD.String(), srv.last.String()),
-			Member:  req.Member,
-		}, nil
+func (srv *Server) createEtcdLogFile() error {
+	var err error
+	srv.etcdLogFile, err = os.Create(srv.Member.Etcd.LogOutput)
+	if err != nil {
+		return err
 	}
 	}
+	srv.lg.Info("created etcd log file", zap.String("path", srv.Member.Etcd.LogOutput))
+	return nil
+}
 
 
-	err := fileutil.TouchDirAll(srv.Member.BaseDir)
-	if err != nil {
-		return nil, err
+func (srv *Server) creatEtcd(fromSnapshot bool) error {
+	if !fileutil.Exist(srv.Member.EtcdExec) || srv.Member.EtcdExec != "embed" {
+		return fmt.Errorf("unknown etcd exec %q or path does not exist", srv.Member.EtcdExec)
 	}
 	}
-	srv.lg.Info("created base directory", zap.String("path", srv.Member.BaseDir))
 
 
-	if srv.etcdServer == nil {
-		if err = srv.createEtcdLogFile(); err != nil {
-			return nil, err
+	if srv.Member.EtcdExec != "embed" {
+		etcdPath, etcdFlags := srv.Member.EtcdExec, srv.Member.Etcd.Flags()
+		if fromSnapshot {
+			etcdFlags = srv.Member.EtcdOnSnapshotRestore.Flags()
 		}
 		}
+		u, _ := url.Parse(srv.Member.FailpointHTTPAddr)
+		srv.lg.Info(
+			"creating etcd command",
+			zap.String("etcd-exec", etcdPath),
+			zap.Strings("etcd-flags", etcdFlags),
+			zap.String("failpoint-http-addr", srv.Member.FailpointHTTPAddr),
+			zap.String("failpoint-addr", u.Host),
+		)
+		srv.etcdCmd = exec.Command(etcdPath, etcdFlags...)
+		srv.etcdCmd.Env = []string{"GOFAIL_HTTP=" + u.Host}
+		srv.etcdCmd.Stdout = srv.etcdLogFile
+		srv.etcdCmd.Stderr = srv.etcdLogFile
+		return nil
 	}
 	}
 
 
-	if err = srv.creatEtcd(false); err != nil {
-		return nil, err
+	cfg, err := srv.Member.Etcd.EmbedConfig()
+	if err != nil {
+		return err
 	}
 	}
-	if err = srv.saveTLSAssets(); err != nil {
-		return nil, err
+
+	srv.lg.Info("starting embedded etcd", zap.String("name", cfg.Name))
+	srv.etcdServer, err = embed.StartEtcd(cfg)
+	if err != nil {
+		return err
 	}
 	}
-	if err = srv.startEtcd(); err != nil {
-		return nil, err
+	srv.lg.Info("started embedded etcd", zap.String("name", cfg.Name))
+
+	return nil
+}
+
+// start but do not wait for it to complete
+func (srv *Server) runEtcd() error {
+	errc := make(chan error)
+	go func() {
+		time.Sleep(5 * time.Second)
+		// server advertise client/peer listener had to start first
+		// before setting up proxy listener
+		errc <- srv.startProxy()
+	}()
+
+	if srv.etcdCmd != nil {
+		srv.lg.Info(
+			"starting etcd command",
+			zap.String("command-path", srv.etcdCmd.Path),
+		)
+		err := srv.etcdCmd.Start()
+		perr := <-errc
+		srv.lg.Info(
+			"started etcd command",
+			zap.String("command-path", srv.etcdCmd.Path),
+			zap.Errors("errors", []error{err, perr}),
+		)
+		if err != nil {
+			return err
+		}
+		return perr
 	}
 	}
-	if err = srv.loadAutoTLSAssets(); err != nil {
-		return nil, err
+
+	select {
+	case <-srv.etcdServer.Server.ReadyNotify():
+		srv.lg.Info("embedded etcd is ready")
+	case <-time.After(time.Minute):
+		srv.etcdServer.Close()
+		return fmt.Errorf("took too long to start %v", <-srv.etcdServer.Err())
 	}
 	}
+	return <-errc
+}
 
 
-	// wait some time for etcd listener start
-	// before setting up proxy
-	time.Sleep(time.Second)
-	if err = srv.startProxy(); err != nil {
-		return nil, err
+// SIGQUIT to exit with stackstrace
+func (srv *Server) stopEtcd(sig os.Signal) error {
+	srv.stopProxy()
+
+	if srv.etcdCmd != nil {
+		srv.lg.Info(
+			"stopping etcd command",
+			zap.String("command-path", srv.etcdCmd.Path),
+			zap.String("signal", sig.String()),
+		)
+
+		err := srv.etcdCmd.Process.Signal(sig)
+		if err != nil {
+			return err
+		}
+
+		errc := make(chan error)
+		go func() {
+			_, ew := srv.etcdCmd.Process.Wait()
+			errc <- ew
+			close(errc)
+		}()
+
+		select {
+		case <-time.After(5 * time.Second):
+			srv.etcdCmd.Process.Kill()
+		case e := <-errc:
+			return e
+		}
+
+		err = <-errc
+
+		srv.lg.Info(
+			"stopped etcd command",
+			zap.String("command-path", srv.etcdCmd.Path),
+			zap.String("signal", sig.String()),
+			zap.Error(err),
+		)
+		return err
 	}
 	}
 
 
-	return &rpcpb.Response{
-		Success: true,
-		Status:  "start etcd PASS",
-		Member:  srv.Member,
-	}, nil
+	srv.lg.Info("stopping embedded etcd")
+	srv.etcdServer.Server.HardStop()
+	srv.etcdServer.Close()
+	srv.lg.Info("stopped embedded etcd")
+	return nil
 }
 }
 
 
 func (srv *Server) startProxy() error {
 func (srv *Server) startProxy() error {
@@ -144,6 +232,7 @@ func (srv *Server) startProxy() error {
 			return err
 			return err
 		}
 		}
 
 
+		srv.lg.Info("starting proxy on client traffic", zap.String("url", advertiseClientURL.String()))
 		srv.advertiseClientPortToProxy[advertiseClientURLPort] = proxy.NewServer(proxy.ServerConfig{
 		srv.advertiseClientPortToProxy[advertiseClientURLPort] = proxy.NewServer(proxy.ServerConfig{
 			Logger: srv.lg,
 			Logger: srv.lg,
 			From:   *advertiseClientURL,
 			From:   *advertiseClientURL,
@@ -167,6 +256,7 @@ func (srv *Server) startProxy() error {
 			return err
 			return err
 		}
 		}
 
 
+		srv.lg.Info("starting proxy on peer traffic", zap.String("url", advertisePeerURL.String()))
 		srv.advertisePeerPortToProxy[advertisePeerURLPort] = proxy.NewServer(proxy.ServerConfig{
 		srv.advertisePeerPortToProxy[advertisePeerURLPort] = proxy.NewServer(proxy.ServerConfig{
 			Logger: srv.lg,
 			Logger: srv.lg,
 			From:   *advertisePeerURL,
 			From:   *advertisePeerURL,
@@ -225,51 +315,6 @@ func (srv *Server) stopProxy() {
 	}
 	}
 }
 }
 
 
-func (srv *Server) createEtcdLogFile() error {
-	var err error
-	srv.etcdLogFile, err = os.Create(srv.Member.Etcd.LogOutput)
-	if err != nil {
-		return err
-	}
-	srv.lg.Info("created etcd log file", zap.String("path", srv.Member.Etcd.LogOutput))
-	return nil
-}
-
-func (srv *Server) creatEtcd(fromSnapshot bool) error {
-	if !fileutil.Exist(srv.Member.EtcdExec) || srv.Member.EtcdExec != "embed" {
-		return fmt.Errorf("unknown etcd exec %q or path does not exist", srv.Member.EtcdExec)
-	}
-
-	if fileutil.Exist(srv.Member.EtcdExec) && srv.Member.EtcdExec != "embed" {
-		etcdPath, etcdFlags := srv.Member.EtcdExec, srv.Member.Etcd.Flags()
-		if fromSnapshot {
-			etcdFlags = srv.Member.EtcdOnSnapshotRestore.Flags()
-		}
-		u, _ := url.Parse(srv.Member.FailpointHTTPAddr)
-		srv.lg.Info("creating etcd command",
-			zap.String("etcd-exec", etcdPath),
-			zap.Strings("etcd-flags", etcdFlags),
-			zap.String("failpoint-http-addr", srv.Member.FailpointHTTPAddr),
-			zap.String("failpoint-addr", u.Host),
-		)
-		srv.etcdCmd = exec.Command(etcdPath, etcdFlags...)
-		srv.etcdCmd.Env = []string{"GOFAIL_HTTP=" + u.Host}
-		srv.etcdCmd.Stdout = srv.etcdLogFile
-		srv.etcdCmd.Stderr = srv.etcdLogFile
-	} else if srv.Member.EtcdExec == "embed" {
-		cfg, err := srv.Member.Etcd.EmbedConfig()
-		if err != nil {
-			return err
-		}
-		srv.etcdServer, err = embed.StartEtcd(cfg)
-		if err != nil {
-			return err
-		}
-		// TODO: set up logging
-	}
-	return nil
-}
-
 // if started with manual TLS, stores TLS assets
 // if started with manual TLS, stores TLS assets
 // from tester/client to disk before starting etcd process
 // from tester/client to disk before starting etcd process
 func (srv *Server) saveTLSAssets() error {
 func (srv *Server) saveTLSAssets() error {
@@ -431,23 +476,45 @@ func (srv *Server) loadAutoTLSAssets() error {
 	return nil
 	return nil
 }
 }
 
 
-// start but do not wait for it to complete
-func (srv *Server) startEtcd() error {
-	if srv.etcdCmd != nil {
-		srv.lg.Info(
-			"started etcd",
-			zap.String("command-path", srv.etcdCmd.Path),
-		)
-		return srv.etcdCmd.Start()
+func (srv *Server) handle_INITIAL_START_ETCD(req *rpcpb.Request) (*rpcpb.Response, error) {
+	if srv.last != rpcpb.Operation_NOT_STARTED {
+		return &rpcpb.Response{
+			Success: false,
+			Status:  fmt.Sprintf("%q is not valid; last server operation was %q", rpcpb.Operation_INITIAL_START_ETCD.String(), srv.last.String()),
+			Member:  req.Member,
+		}, nil
 	}
 	}
-	select {
-	case <-srv.etcdServer.Server.ReadyNotify():
-		srv.lg.Info("started embedded etcd")
-	case <-time.After(time.Minute):
-		srv.etcdServer.Close()
-		return fmt.Errorf("took too long to start %v", <-srv.etcdServer.Err())
+
+	err := fileutil.TouchDirAll(srv.Member.BaseDir)
+	if err != nil {
+		return nil, err
 	}
 	}
-	return nil
+	srv.lg.Info("created base directory", zap.String("path", srv.Member.BaseDir))
+
+	if srv.etcdServer == nil {
+		if err = srv.createEtcdLogFile(); err != nil {
+			return nil, err
+		}
+	}
+
+	if err = srv.saveTLSAssets(); err != nil {
+		return nil, err
+	}
+	if err = srv.creatEtcd(false); err != nil {
+		return nil, err
+	}
+	if err = srv.runEtcd(); err != nil {
+		return nil, err
+	}
+	if err = srv.loadAutoTLSAssets(); err != nil {
+		return nil, err
+	}
+
+	return &rpcpb.Response{
+		Success: true,
+		Status:  "start etcd PASS",
+		Member:  srv.Member,
+	}, nil
 }
 }
 
 
 func (srv *Server) handle_RESTART_ETCD() (*rpcpb.Response, error) {
 func (srv *Server) handle_RESTART_ETCD() (*rpcpb.Response, error) {
@@ -459,25 +526,16 @@ func (srv *Server) handle_RESTART_ETCD() (*rpcpb.Response, error) {
 		}
 		}
 	}
 	}
 
 
-	if err = srv.creatEtcd(false); err != nil {
-		return nil, err
-	}
 	if err = srv.saveTLSAssets(); err != nil {
 	if err = srv.saveTLSAssets(); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if err = srv.startEtcd(); err != nil {
+	if err = srv.creatEtcd(false); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if err = srv.loadAutoTLSAssets(); err != nil {
+	if err = srv.runEtcd(); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-
-	// wait some time for etcd listener start
-	// before setting up proxy
-	// TODO: local tests should handle port conflicts
-	// with clients on restart
-	time.Sleep(time.Second)
-	if err = srv.startProxy(); err != nil {
+	if err = srv.loadAutoTLSAssets(); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
@@ -489,13 +547,15 @@ func (srv *Server) handle_RESTART_ETCD() (*rpcpb.Response, error) {
 }
 }
 
 
 func (srv *Server) handle_SIGTERM_ETCD() (*rpcpb.Response, error) {
 func (srv *Server) handle_SIGTERM_ETCD() (*rpcpb.Response, error) {
-	srv.stopProxy()
-
-	err := stopWithSig(srv.etcdCmd, syscall.SIGTERM)
-	if err != nil {
+	if err := srv.stopEtcd(syscall.SIGTERM); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String()))
+
+	if srv.etcdServer != nil {
+		srv.etcdServer.GetLogger().Sync()
+	} else {
+		srv.etcdLogFile.Sync()
+	}
 
 
 	return &rpcpb.Response{
 	return &rpcpb.Response{
 		Success: true,
 		Success: true,
@@ -504,13 +564,10 @@ func (srv *Server) handle_SIGTERM_ETCD() (*rpcpb.Response, error) {
 }
 }
 
 
 func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA() (*rpcpb.Response, error) {
 func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA() (*rpcpb.Response, error) {
-	srv.stopProxy()
-
-	err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT)
+	err := srv.stopEtcd(syscall.SIGQUIT)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
 
 
 	if srv.etcdServer != nil {
 	if srv.etcdServer != nil {
 		srv.etcdServer.GetLogger().Sync()
 		srv.etcdServer.GetLogger().Sync()
@@ -571,25 +628,16 @@ func (srv *Server) handle_RESTORE_RESTART_FROM_SNAPSHOT() (resp *rpcpb.Response,
 }
 }
 
 
 func (srv *Server) handle_RESTART_FROM_SNAPSHOT() (resp *rpcpb.Response, err error) {
 func (srv *Server) handle_RESTART_FROM_SNAPSHOT() (resp *rpcpb.Response, err error) {
-	if err = srv.creatEtcd(true); err != nil {
-		return nil, err
-	}
 	if err = srv.saveTLSAssets(); err != nil {
 	if err = srv.saveTLSAssets(); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if err = srv.startEtcd(); err != nil {
+	if err = srv.creatEtcd(true); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if err = srv.loadAutoTLSAssets(); err != nil {
+	if err = srv.runEtcd(); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-
-	// wait some time for etcd listener start
-	// before setting up proxy
-	// TODO: local tests should handle port conflicts
-	// with clients on restart
-	time.Sleep(time.Second)
-	if err = srv.startProxy(); err != nil {
+	if err = srv.loadAutoTLSAssets(); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
@@ -601,14 +649,10 @@ func (srv *Server) handle_RESTART_FROM_SNAPSHOT() (resp *rpcpb.Response, err err
 }
 }
 
 
 func (srv *Server) handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA() (*rpcpb.Response, error) {
 func (srv *Server) handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA() (*rpcpb.Response, error) {
-	srv.stopProxy()
-
-	// exit with stackstrace
-	err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT)
+	err := srv.stopEtcd(syscall.SIGQUIT)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
 
 
 	if srv.etcdServer != nil {
 	if srv.etcdServer != nil {
 		srv.etcdServer.GetLogger().Sync()
 		srv.etcdServer.GetLogger().Sync()
@@ -647,13 +691,10 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA() (*rpcpb.Response, erro
 
 
 // stop proxy, etcd, delete data directory
 // stop proxy, etcd, delete data directory
 func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() (*rpcpb.Response, error) {
 func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() (*rpcpb.Response, error) {
-	srv.stopProxy()
-
-	err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT)
+	err := srv.stopEtcd(syscall.SIGQUIT)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
 
 
 	if srv.etcdServer != nil {
 	if srv.etcdServer != nil {
 		srv.etcdServer.GetLogger().Sync()
 		srv.etcdServer.GetLogger().Sync()

+ 0 - 23
functional/agent/utils.go

@@ -79,29 +79,6 @@ func getURLAndPort(addr string) (urlAddr *url.URL, port int, err error) {
 	return urlAddr, port, err
 	return urlAddr, port, err
 }
 }
 
 
-func stopWithSig(cmd *exec.Cmd, sig os.Signal) error {
-	err := cmd.Process.Signal(sig)
-	if err != nil {
-		return err
-	}
-
-	errc := make(chan error)
-	go func() {
-		_, ew := cmd.Process.Wait()
-		errc <- ew
-		close(errc)
-	}()
-
-	select {
-	case <-time.After(5 * time.Second):
-		cmd.Process.Kill()
-	case e := <-errc:
-		return e
-	}
-	err = <-errc
-	return err
-}
-
 func cleanPageCache() error {
 func cleanPageCache() error {
 	// https://www.kernel.org/doc/Documentation/sysctl/vm.txt
 	// https://www.kernel.org/doc/Documentation/sysctl/vm.txt
 	// https://github.com/torvalds/linux/blob/master/fs/drop_caches.c
 	// https://github.com/torvalds/linux/blob/master/fs/drop_caches.c