client.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package server
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io/ioutil"
  9. "net/http"
  10. "strconv"
  11. etcdErr "github.com/coreos/etcd/error"
  12. "github.com/coreos/etcd/log"
  13. )
  14. // Client sends various requests using HTTP API.
  15. // It is different from raft communication, and doesn't record anything in the log.
  16. // The argument url is required to contain scheme and host only, and
  17. // there is no trailing slash in it.
  18. // Public functions return "etcd/error".Error intentionally to figure out
  19. // etcd error code easily.
  20. // TODO(yichengq): It is similar to go-etcd. But it could have many efforts
  21. // to integrate the two. Leave it for further discussion.
  22. type Client struct {
  23. http.Client
  24. }
  25. func NewClient(transport http.RoundTripper) *Client {
  26. return &Client{http.Client{Transport: transport}}
  27. }
  28. // CheckVersion returns true when the version check on the server returns 200.
  29. func (c *Client) CheckVersion(url string, version int) (bool, *etcdErr.Error) {
  30. resp, err := c.Get(url + fmt.Sprintf("/version/%d/check", version))
  31. if err != nil {
  32. return false, clientError(err)
  33. }
  34. defer resp.Body.Close()
  35. return resp.StatusCode == 200, nil
  36. }
  37. // GetVersion fetches the peer version of a cluster.
  38. func (c *Client) GetVersion(url string) (int, *etcdErr.Error) {
  39. resp, err := c.Get(url + "/version")
  40. if err != nil {
  41. return 0, clientError(err)
  42. }
  43. defer resp.Body.Close()
  44. body, err := ioutil.ReadAll(resp.Body)
  45. if err != nil {
  46. return 0, clientError(err)
  47. }
  48. // Parse version number.
  49. version, err := strconv.Atoi(string(body))
  50. if err != nil {
  51. return 0, clientError(err)
  52. }
  53. return version, nil
  54. }
  55. func (c *Client) GetMachines(url string) ([]*machineMessage, *etcdErr.Error) {
  56. resp, err := c.Get(url + "/v2/admin/machines")
  57. if err != nil {
  58. return nil, clientError(err)
  59. }
  60. msgs := new([]*machineMessage)
  61. if uerr := c.parseJSONResponse(resp, msgs); uerr != nil {
  62. return nil, uerr
  63. }
  64. return *msgs, nil
  65. }
  66. func (c *Client) GetClusterConfig(url string) (*ClusterConfig, *etcdErr.Error) {
  67. resp, err := c.Get(url + "/v2/admin/config")
  68. if err != nil {
  69. return nil, clientError(err)
  70. }
  71. config := new(ClusterConfig)
  72. if uerr := c.parseJSONResponse(resp, config); uerr != nil {
  73. return nil, uerr
  74. }
  75. return config, nil
  76. }
  77. // AddMachine adds machine to the cluster.
  78. // The first return value is the commit index of join command.
  79. func (c *Client) AddMachine(url string, cmd *JoinCommand) (uint64, *etcdErr.Error) {
  80. b, _ := json.Marshal(cmd)
  81. url = url + "/join"
  82. log.Infof("Send Join Request to %s", url)
  83. resp, err := c.put(url, b)
  84. if err != nil {
  85. return 0, clientError(err)
  86. }
  87. defer resp.Body.Close()
  88. if err := c.checkErrorResponse(resp); err != nil {
  89. return 0, err
  90. }
  91. b, err = ioutil.ReadAll(resp.Body)
  92. if err != nil {
  93. return 0, clientError(err)
  94. }
  95. index, numRead := binary.Uvarint(b)
  96. if numRead < 0 {
  97. return 0, clientError(fmt.Errorf("buf too small, or value too large"))
  98. }
  99. return index, nil
  100. }
  101. func (c *Client) parseJSONResponse(resp *http.Response, val interface{}) *etcdErr.Error {
  102. defer resp.Body.Close()
  103. if err := c.checkErrorResponse(resp); err != nil {
  104. return err
  105. }
  106. if err := json.NewDecoder(resp.Body).Decode(val); err != nil {
  107. log.Debugf("Error parsing join response: %v", err)
  108. return clientError(err)
  109. }
  110. return nil
  111. }
  112. func (c *Client) checkErrorResponse(resp *http.Response) *etcdErr.Error {
  113. if resp.StatusCode != http.StatusOK {
  114. uerr := &etcdErr.Error{}
  115. if err := json.NewDecoder(resp.Body).Decode(uerr); err != nil {
  116. log.Debugf("Error parsing response to etcd error: %v", err)
  117. return clientError(err)
  118. }
  119. return uerr
  120. }
  121. return nil
  122. }
  123. // put sends server side PUT request.
  124. // It always follows redirects instead of stopping according to RFC 2616.
  125. func (c *Client) put(urlStr string, body []byte) (*http.Response, error) {
  126. return c.doAlwaysFollowingRedirects("PUT", urlStr, body)
  127. }
  128. func (c *Client) doAlwaysFollowingRedirects(method string, urlStr string, body []byte) (resp *http.Response, err error) {
  129. var req *http.Request
  130. for redirect := 0; redirect < 10; redirect++ {
  131. req, err = http.NewRequest(method, urlStr, bytes.NewBuffer(body))
  132. if err != nil {
  133. return
  134. }
  135. if resp, err = c.Do(req); err != nil {
  136. if resp != nil {
  137. resp.Body.Close()
  138. }
  139. return
  140. }
  141. if resp.StatusCode == http.StatusMovedPermanently || resp.StatusCode == http.StatusTemporaryRedirect {
  142. resp.Body.Close()
  143. if urlStr = resp.Header.Get("Location"); urlStr == "" {
  144. err = errors.New(fmt.Sprintf("%d response missing Location header", resp.StatusCode))
  145. return
  146. }
  147. continue
  148. }
  149. return
  150. }
  151. err = errors.New("stopped after 10 redirects")
  152. return
  153. }
  154. func clientError(err error) *etcdErr.Error {
  155. return etcdErr.NewError(etcdErr.EcodeClientInternal, err.Error(), 0)
  156. }