client.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. package server
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io/ioutil"
  9. "net/http"
  10. "net/url"
  11. "strconv"
  12. etcdErr "github.com/coreos/etcd/error"
  13. "github.com/coreos/etcd/log"
  14. )
  15. // Client sends various requests using HTTP API.
  16. // It is different from raft communication, and doesn't record anything in the log.
  17. // Public functions return "etcd/error".Error intentionally to figure out
  18. // etcd error code easily.
  19. // TODO(yichengq): It is similar to go-etcd. But it could have many efforts
  20. // to integrate the two. Leave it for further discussion.
  21. type Client struct {
  22. http.Client
  23. }
  24. func NewClient(transport http.RoundTripper) *Client {
  25. return &Client{http.Client{Transport: transport}}
  26. }
  27. // CheckVersion checks whether the version is available.
  28. func (c *Client) CheckVersion(url string, version int) (bool, *etcdErr.Error) {
  29. resp, err := c.Get(url + fmt.Sprintf("/version/%d/check", version))
  30. if err != nil {
  31. return false, clientError(err)
  32. }
  33. defer resp.Body.Close()
  34. return resp.StatusCode == 200, nil
  35. }
  36. // GetVersion fetches the peer version of a cluster.
  37. func (c *Client) GetVersion(url string) (int, *etcdErr.Error) {
  38. resp, err := c.Get(url + "/version")
  39. if err != nil {
  40. return 0, clientError(err)
  41. }
  42. defer resp.Body.Close()
  43. body, err := ioutil.ReadAll(resp.Body)
  44. if err != nil {
  45. return 0, clientError(err)
  46. }
  47. // Parse version number.
  48. version, _ := strconv.Atoi(string(body))
  49. return version, nil
  50. }
  51. func (c *Client) GetMachines(url string) ([]*machineMessage, *etcdErr.Error) {
  52. resp, err := c.Get(url + "/v2/admin/machines")
  53. if err != nil {
  54. return nil, clientError(err)
  55. }
  56. msgs := new([]*machineMessage)
  57. if uerr := c.parseJSONResponse(resp, msgs); uerr != nil {
  58. return nil, uerr
  59. }
  60. return *msgs, nil
  61. }
  62. func (c *Client) GetClusterConfig(url string) (*ClusterConfig, *etcdErr.Error) {
  63. resp, err := c.Get(url + "/v2/admin/config")
  64. if err != nil {
  65. return nil, clientError(err)
  66. }
  67. config := new(ClusterConfig)
  68. if uerr := c.parseJSONResponse(resp, config); uerr != nil {
  69. return nil, uerr
  70. }
  71. return config, nil
  72. }
  73. // AddMachine adds machine to the cluster.
  74. // The first return value is the commit index of join command.
  75. func (c *Client) AddMachine(url string, cmd *JoinCommand) (uint64, *etcdErr.Error) {
  76. b, _ := json.Marshal(cmd)
  77. url = url + "/join"
  78. log.Infof("Send Join Request to %s", url)
  79. resp, err := c.put(url, b)
  80. if err != nil {
  81. return 0, clientError(err)
  82. }
  83. defer resp.Body.Close()
  84. if err := c.checkErrorResponse(resp); err != nil {
  85. return 0, err
  86. }
  87. b, err = ioutil.ReadAll(resp.Body)
  88. if err != nil {
  89. return 0, clientError(err)
  90. }
  91. index, numRead := binary.Uvarint(b)
  92. if numRead < 0 {
  93. return 0, clientError(fmt.Errorf("buf too small, or value too large"))
  94. }
  95. return index, nil
  96. }
  97. func (c *Client) parseJSONResponse(resp *http.Response, val interface{}) *etcdErr.Error {
  98. defer resp.Body.Close()
  99. if err := c.checkErrorResponse(resp); err != nil {
  100. return err
  101. }
  102. if err := json.NewDecoder(resp.Body).Decode(val); err != nil {
  103. log.Debugf("Error parsing join response: %v", err)
  104. return clientError(err)
  105. }
  106. return nil
  107. }
  108. func (c *Client) checkErrorResponse(resp *http.Response) *etcdErr.Error {
  109. if resp.StatusCode != http.StatusOK {
  110. uerr := &etcdErr.Error{}
  111. if err := json.NewDecoder(resp.Body).Decode(uerr); err != nil {
  112. log.Debugf("Error parsing response to etcd error: %v", err)
  113. return clientError(err)
  114. }
  115. return uerr
  116. }
  117. return nil
  118. }
  119. // put sends server side PUT request.
  120. // It always follows redirects instead of stopping according to RFC 2616.
  121. func (c *Client) put(urlStr string, body []byte) (*http.Response, error) {
  122. req, err := http.NewRequest("PUT", urlStr, bytes.NewBuffer(body))
  123. if err != nil {
  124. return nil, err
  125. }
  126. return c.doAlwaysFollowingRedirects(req, body)
  127. }
  128. // doAlwaysFollowingRedirects provides similar functionality as standard one,
  129. // but it does redirect with the same method for PUT or POST requests.
  130. // Part of the code is borrowed from pkg/net/http/client.go.
  131. func (c *Client) doAlwaysFollowingRedirects(ireq *http.Request, body []byte) (resp *http.Response, err error) {
  132. var base *url.URL
  133. redirectChecker := c.CheckRedirect
  134. if redirectChecker == nil {
  135. redirectChecker = defaultCheckRedirect
  136. }
  137. var via []*http.Request
  138. req := ireq
  139. urlStr := "" // next relative or absolute URL to fetch (after first request)
  140. for redirect := 0; ; redirect++ {
  141. if redirect != 0 {
  142. req, err = http.NewRequest(ireq.Method, urlStr, bytes.NewBuffer(body))
  143. if err != nil {
  144. break
  145. }
  146. req.URL = base.ResolveReference(req.URL)
  147. if len(via) > 0 {
  148. // Add the Referer header.
  149. lastReq := via[len(via)-1]
  150. if lastReq.URL.Scheme != "https" {
  151. req.Header.Set("Referer", lastReq.URL.String())
  152. }
  153. err = redirectChecker(req, via)
  154. if err != nil {
  155. break
  156. }
  157. }
  158. }
  159. urlStr = req.URL.String()
  160. // It uses exported Do method here.
  161. // It is more elegant to use unexported send method, but that will
  162. // introduce many redundant code.
  163. if resp, err = c.Do(req); err != nil {
  164. break
  165. }
  166. if shouldExtraRedirectPost(resp.StatusCode) {
  167. resp.Body.Close()
  168. if urlStr = resp.Header.Get("Location"); urlStr == "" {
  169. err = errors.New(fmt.Sprintf("%d response missing Location header", resp.StatusCode))
  170. break
  171. }
  172. base = req.URL
  173. via = append(via, req)
  174. continue
  175. }
  176. return
  177. }
  178. if resp != nil {
  179. resp.Body.Close()
  180. }
  181. return nil, err
  182. }
  183. func shouldExtraRedirectPost(statusCode int) bool {
  184. switch statusCode {
  185. case http.StatusMovedPermanently, http.StatusTemporaryRedirect:
  186. return true
  187. }
  188. return false
  189. }
  190. func defaultCheckRedirect(req *http.Request, via []*http.Request) error {
  191. if len(via) >= 10 {
  192. return errors.New("stopped after 10 redirects")
  193. }
  194. return nil
  195. }
  196. func clientError(err error) *etcdErr.Error {
  197. return etcdErr.NewError(etcdErr.EcodeClientInternal, err.Error(), 0)
  198. }