Browse Source

server: add stop serving func to v2_client

Yicheng Qin 11 years ago
parent
commit
46974ef473
1 changed files with 49 additions and 11 deletions
  1. 49 11
      etcd/v2_client.go

+ 49 - 11
etcd/v2_client.go

@@ -25,6 +25,9 @@ import (
 // Public functions return "etcd/error".Error intentionally to figure out
 // etcd error code easily.
 type v2client struct {
+	stopped bool
+	mu      sync.RWMutex
+
 	http.Client
 	wg sync.WaitGroup
 }
@@ -36,6 +39,7 @@ func newClient(tc *tls.Config) *v2client {
 }
 
 func (c *v2client) CloseConnections() {
+	c.stop()
 	c.wg.Wait()
 	tr := c.Transport.(*http.Transport)
 	tr.CloseIdleConnections()
@@ -43,8 +47,12 @@ func (c *v2client) CloseConnections() {
 
 // CheckVersion returns true when the version check on the server returns 200.
 func (c *v2client) CheckVersion(url string, version int) (bool, *etcdErr.Error) {
+	if c.runOne() == false {
+		return false, clientError(errors.New("v2_client is stopped"))
+	}
+	defer c.finishOne()
+
 	resp, err := c.Get(url + fmt.Sprintf("/version/%d/check", version))
-	defer c.wg.Done()
 	if err != nil {
 		return false, clientError(err)
 	}
@@ -55,8 +63,12 @@ func (c *v2client) CheckVersion(url string, version int) (bool, *etcdErr.Error)
 
 // GetVersion fetches the peer version of a cluster.
 func (c *v2client) GetVersion(url string) (int, *etcdErr.Error) {
+	if c.runOne() == false {
+		return 0, clientError(errors.New("v2_client is stopped"))
+	}
+	defer c.finishOne()
+
 	resp, err := c.Get(url + "/version")
-	defer c.wg.Done()
 	if err != nil {
 		return 0, clientError(err)
 	}
@@ -75,8 +87,12 @@ func (c *v2client) GetVersion(url string) (int, *etcdErr.Error) {
 }
 
 func (c *v2client) GetMachines(url string) ([]*machineMessage, *etcdErr.Error) {
+	if c.runOne() == false {
+		return nil, clientError(errors.New("v2_client is stopped"))
+	}
+	defer c.finishOne()
+
 	resp, err := c.Get(url + "/v2/admin/machines/")
-	defer c.wg.Done()
 	if err != nil {
 		return nil, clientError(err)
 	}
@@ -92,8 +108,12 @@ func (c *v2client) GetMachines(url string) ([]*machineMessage, *etcdErr.Error) {
 }
 
 func (c *v2client) GetClusterConfig(url string) (*config.ClusterConfig, *etcdErr.Error) {
+	if c.runOne() == false {
+		return nil, clientError(errors.New("v2_client is stopped"))
+	}
+	defer c.finishOne()
+
 	resp, err := c.Get(url + "/v2/admin/config")
-	defer c.wg.Done()
 	if err != nil {
 		return nil, clientError(err)
 	}
@@ -111,12 +131,16 @@ func (c *v2client) GetClusterConfig(url string) (*config.ClusterConfig, *etcdErr
 // AddMachine adds machine to the cluster.
 // The first return value is the commit index of join command.
 func (c *v2client) AddMachine(url string, name string, info *context) *etcdErr.Error {
+	if c.runOne() == false {
+		return clientError(errors.New("v2_client is stopped"))
+	}
+	defer c.finishOne()
+
 	b, _ := json.Marshal(info)
 	url = url + "/v2/admin/machines/" + name
 
 	log.Printf("Send Join Request to %s", url)
 	resp, err := c.put(url, b)
-	defer c.wg.Done()
 	if err != nil {
 		return clientError(err)
 	}
@@ -155,15 +179,9 @@ func (c *v2client) readBody(body io.ReadCloser) ([]byte, error) {
 	return b, err
 }
 
-func (c *v2client) Get(url string) (*http.Response, error) {
-	c.wg.Add(1)
-	return c.Client.Get(url)
-}
-
 // put sends server side PUT request.
 // It always follows redirects instead of stopping according to RFC 2616.
 func (c *v2client) put(urlStr string, body []byte) (*http.Response, error) {
-	c.wg.Add(1)
 	return c.doAlwaysFollowingRedirects("PUT", urlStr, body)
 }
 
@@ -198,6 +216,26 @@ func (c *v2client) doAlwaysFollowingRedirects(method string, urlStr string, body
 	return
 }
 
+func (c *v2client) runOne() bool {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
+	if c.stopped {
+		return false
+	}
+	c.wg.Add(1)
+	return true
+}
+
+func (c *v2client) finishOne() {
+	c.wg.Done()
+}
+
+func (c *v2client) stop() {
+	c.mu.Lock()
+	c.stopped = true
+	c.mu.Unlock()
+}
+
 func clientError(err error) *etcdErr.Error {
 	return etcdErr.NewError(etcdErr.EcodeClientInternal, err.Error(), 0)
 }