v2_client.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdserver
  14. import (
  15. "bytes"
  16. "crypto/tls"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "io/ioutil"
  22. "net/http"
  23. "strconv"
  24. "strings"
  25. "sync"
  26. "github.com/coreos/etcd/conf"
  27. etcdErr "github.com/coreos/etcd/error"
  28. )
  29. // v2client sends various requests using HTTP API.
  30. // It is different from raft communication, and doesn't record anything in the log.
  31. // The argument url is required to contain scheme and host only, and
  32. // there is no trailing slash in it.
  33. // Public functions return "etcd/error".Error intentionally to figure out
  34. // etcd error code easily.
  35. type v2client struct {
  36. stopped bool
  37. mu sync.RWMutex
  38. http.Client
  39. wg sync.WaitGroup
  40. }
  41. func newClient(tc *tls.Config) *v2client {
  42. tr := new(http.Transport)
  43. tr.TLSClientConfig = tc
  44. return &v2client{Client: http.Client{Transport: tr}}
  45. }
  46. func (c *v2client) CloseConnections() {
  47. c.stop()
  48. c.wg.Wait()
  49. tr := c.Transport.(*http.Transport)
  50. tr.CloseIdleConnections()
  51. }
  52. // CheckVersion returns true when the version check on the server returns 200.
  53. func (c *v2client) CheckVersion(url string, version int) (bool, *etcdErr.Error) {
  54. if c.runOne() == false {
  55. return false, clientError(errors.New("v2_client is stopped"))
  56. }
  57. defer c.finishOne()
  58. resp, err := c.Get(url + fmt.Sprintf("/version/%d/check", version))
  59. if err != nil {
  60. return false, clientError(err)
  61. }
  62. c.readBody(resp.Body)
  63. return resp.StatusCode == 200, nil
  64. }
  65. // GetVersion fetches the peer version of a cluster.
  66. func (c *v2client) GetVersion(url string) (int, *etcdErr.Error) {
  67. if c.runOne() == false {
  68. return 0, clientError(errors.New("v2_client is stopped"))
  69. }
  70. defer c.finishOne()
  71. resp, err := c.Get(url + "/version")
  72. if err != nil {
  73. return 0, clientError(err)
  74. }
  75. body, err := c.readBody(resp.Body)
  76. if err != nil {
  77. return 0, clientError(err)
  78. }
  79. // Parse version number.
  80. version, err := strconv.Atoi(string(body))
  81. if err != nil {
  82. return 0, clientError(err)
  83. }
  84. return version, nil
  85. }
  86. func (c *v2client) GetMachines(url string) ([]*machineMessage, *etcdErr.Error) {
  87. if c.runOne() == false {
  88. return nil, clientError(errors.New("v2_client is stopped"))
  89. }
  90. defer c.finishOne()
  91. resp, err := c.Get(url + "/v2/admin/machines/")
  92. if err != nil {
  93. return nil, clientError(err)
  94. }
  95. if resp.StatusCode != http.StatusOK {
  96. return nil, c.readErrorBody(resp.Body)
  97. }
  98. msgs := new([]*machineMessage)
  99. if uerr := c.readJSONBody(resp.Body, msgs); uerr != nil {
  100. return nil, uerr
  101. }
  102. return *msgs, nil
  103. }
  104. func (c *v2client) GetClusterConfig(url string) (*conf.ClusterConfig, *etcdErr.Error) {
  105. if c.runOne() == false {
  106. return nil, clientError(errors.New("v2_client is stopped"))
  107. }
  108. defer c.finishOne()
  109. resp, err := c.Get(url + "/v2/admin/config")
  110. if err != nil {
  111. return nil, clientError(err)
  112. }
  113. if resp.StatusCode != http.StatusOK {
  114. return nil, c.readErrorBody(resp.Body)
  115. }
  116. cfg := new(conf.ClusterConfig)
  117. if uerr := c.readJSONBody(resp.Body, cfg); uerr != nil {
  118. return nil, uerr
  119. }
  120. return cfg, nil
  121. }
  122. // AddMachine adds machine to the cluster.
  123. // The first return value is the commit index of join command.
  124. func (c *v2client) AddMachine(url string, name string, info *context) *etcdErr.Error {
  125. if c.runOne() == false {
  126. return clientError(errors.New("v2_client is stopped"))
  127. }
  128. defer c.finishOne()
  129. b, _ := json.Marshal(info)
  130. url = url + "/v2/admin/machines/" + name
  131. resp, err := c.put(url, b)
  132. if err != nil {
  133. return clientError(err)
  134. }
  135. if resp.StatusCode != http.StatusOK {
  136. return c.readErrorBody(resp.Body)
  137. }
  138. c.readBody(resp.Body)
  139. return nil
  140. }
  141. func (c *v2client) readErrorBody(body io.ReadCloser) *etcdErr.Error {
  142. b, err := c.readBody(body)
  143. if err != nil {
  144. return clientError(err)
  145. }
  146. uerr := &etcdErr.Error{}
  147. if err := json.Unmarshal(b, uerr); err != nil {
  148. str := strings.TrimSpace(string(b))
  149. return etcdErr.NewError(etcdErr.EcodeClientInternal, str, 0)
  150. }
  151. return nil
  152. }
  153. func (c *v2client) readJSONBody(body io.ReadCloser, val interface{}) *etcdErr.Error {
  154. if err := json.NewDecoder(body).Decode(val); err != nil {
  155. return clientError(err)
  156. }
  157. c.readBody(body)
  158. return nil
  159. }
  160. func (c *v2client) readBody(body io.ReadCloser) ([]byte, error) {
  161. b, err := ioutil.ReadAll(body)
  162. body.Close()
  163. return b, err
  164. }
  165. // put sends server side PUT request.
  166. // It always follows redirects instead of stopping according to RFC 2616.
  167. func (c *v2client) put(urlStr string, body []byte) (*http.Response, error) {
  168. return c.doAlwaysFollowingRedirects("PUT", urlStr, body)
  169. }
  170. func (c *v2client) doAlwaysFollowingRedirects(method string, urlStr string, body []byte) (resp *http.Response, err error) {
  171. var req *http.Request
  172. for redirect := 0; redirect < 10; redirect++ {
  173. req, err = http.NewRequest(method, urlStr, bytes.NewBuffer(body))
  174. if err != nil {
  175. return
  176. }
  177. if resp, err = c.Do(req); err != nil {
  178. if resp != nil {
  179. resp.Body.Close()
  180. }
  181. return
  182. }
  183. if resp.StatusCode == http.StatusMovedPermanently || resp.StatusCode == http.StatusTemporaryRedirect {
  184. resp.Body.Close()
  185. if urlStr = resp.Header.Get("Location"); urlStr == "" {
  186. err = errors.New(fmt.Sprintf("%d response missing Location header", resp.StatusCode))
  187. return
  188. }
  189. continue
  190. }
  191. return
  192. }
  193. err = errors.New("stopped after 10 redirects")
  194. return
  195. }
  196. func (c *v2client) runOne() bool {
  197. c.mu.RLock()
  198. defer c.mu.RUnlock()
  199. if c.stopped {
  200. return false
  201. }
  202. c.wg.Add(1)
  203. return true
  204. }
  205. func (c *v2client) finishOne() {
  206. c.wg.Done()
  207. }
  208. func (c *v2client) stop() {
  209. c.mu.Lock()
  210. c.stopped = true
  211. c.mu.Unlock()
  212. }
  213. func clientError(err error) *etcdErr.Error {
  214. return etcdErr.NewError(etcdErr.EcodeClientInternal, err.Error(), 0)
  215. }