v2_client.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcd
  14. import (
  15. "bytes"
  16. "crypto/tls"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "io/ioutil"
  22. "log"
  23. "net/http"
  24. "strconv"
  25. "strings"
  26. "sync"
  27. "github.com/coreos/etcd/config"
  28. etcdErr "github.com/coreos/etcd/error"
  29. )
  30. // v2client sends various requests using HTTP API.
  31. // It is different from raft communication, and doesn't record anything in the log.
  32. // The argument url is required to contain scheme and host only, and
  33. // there is no trailing slash in it.
  34. // Public functions return "etcd/error".Error intentionally to figure out
  35. // etcd error code easily.
  36. type v2client struct {
  37. stopped bool
  38. mu sync.RWMutex
  39. http.Client
  40. wg sync.WaitGroup
  41. }
  42. func newClient(tc *tls.Config) *v2client {
  43. tr := new(http.Transport)
  44. tr.TLSClientConfig = tc
  45. return &v2client{Client: http.Client{Transport: tr}}
  46. }
  47. func (c *v2client) CloseConnections() {
  48. c.stop()
  49. c.wg.Wait()
  50. tr := c.Transport.(*http.Transport)
  51. tr.CloseIdleConnections()
  52. }
  53. // CheckVersion returns true when the version check on the server returns 200.
  54. func (c *v2client) CheckVersion(url string, version int) (bool, *etcdErr.Error) {
  55. if c.runOne() == false {
  56. return false, clientError(errors.New("v2_client is stopped"))
  57. }
  58. defer c.finishOne()
  59. resp, err := c.Get(url + fmt.Sprintf("/version/%d/check", version))
  60. if err != nil {
  61. return false, clientError(err)
  62. }
  63. c.readBody(resp.Body)
  64. return resp.StatusCode == 200, nil
  65. }
  66. // GetVersion fetches the peer version of a cluster.
  67. func (c *v2client) GetVersion(url string) (int, *etcdErr.Error) {
  68. if c.runOne() == false {
  69. return 0, clientError(errors.New("v2_client is stopped"))
  70. }
  71. defer c.finishOne()
  72. resp, err := c.Get(url + "/version")
  73. if err != nil {
  74. return 0, clientError(err)
  75. }
  76. body, err := c.readBody(resp.Body)
  77. if err != nil {
  78. return 0, clientError(err)
  79. }
  80. // Parse version number.
  81. version, err := strconv.Atoi(string(body))
  82. if err != nil {
  83. return 0, clientError(err)
  84. }
  85. return version, nil
  86. }
  87. func (c *v2client) GetMachines(url string) ([]*machineMessage, *etcdErr.Error) {
  88. if c.runOne() == false {
  89. return nil, clientError(errors.New("v2_client is stopped"))
  90. }
  91. defer c.finishOne()
  92. resp, err := c.Get(url + "/v2/admin/machines/")
  93. if err != nil {
  94. return nil, clientError(err)
  95. }
  96. if resp.StatusCode != http.StatusOK {
  97. return nil, c.readErrorBody(resp.Body)
  98. }
  99. msgs := new([]*machineMessage)
  100. if uerr := c.readJSONBody(resp.Body, msgs); uerr != nil {
  101. return nil, uerr
  102. }
  103. return *msgs, nil
  104. }
  105. func (c *v2client) GetClusterConfig(url string) (*config.ClusterConfig, *etcdErr.Error) {
  106. if c.runOne() == false {
  107. return nil, clientError(errors.New("v2_client is stopped"))
  108. }
  109. defer c.finishOne()
  110. resp, err := c.Get(url + "/v2/admin/config")
  111. if err != nil {
  112. return nil, clientError(err)
  113. }
  114. if resp.StatusCode != http.StatusOK {
  115. return nil, c.readErrorBody(resp.Body)
  116. }
  117. config := new(config.ClusterConfig)
  118. if uerr := c.readJSONBody(resp.Body, config); uerr != nil {
  119. return nil, uerr
  120. }
  121. return config, nil
  122. }
  123. // AddMachine adds machine to the cluster.
  124. // The first return value is the commit index of join command.
  125. func (c *v2client) AddMachine(url string, name string, info *context) *etcdErr.Error {
  126. if c.runOne() == false {
  127. return clientError(errors.New("v2_client is stopped"))
  128. }
  129. defer c.finishOne()
  130. b, _ := json.Marshal(info)
  131. url = url + "/v2/admin/machines/" + name
  132. log.Printf("Send Join Request to %s", url)
  133. resp, err := c.put(url, b)
  134. if err != nil {
  135. return clientError(err)
  136. }
  137. if resp.StatusCode != http.StatusOK {
  138. return c.readErrorBody(resp.Body)
  139. }
  140. c.readBody(resp.Body)
  141. return nil
  142. }
  143. func (c *v2client) readErrorBody(body io.ReadCloser) *etcdErr.Error {
  144. b, err := c.readBody(body)
  145. if err != nil {
  146. return clientError(err)
  147. }
  148. uerr := &etcdErr.Error{}
  149. if err := json.Unmarshal(b, uerr); err != nil {
  150. str := strings.TrimSpace(string(b))
  151. return etcdErr.NewError(etcdErr.EcodeClientInternal, str, 0)
  152. }
  153. return nil
  154. }
  155. func (c *v2client) readJSONBody(body io.ReadCloser, val interface{}) *etcdErr.Error {
  156. if err := json.NewDecoder(body).Decode(val); err != nil {
  157. log.Printf("Error parsing join response: %v", err)
  158. return clientError(err)
  159. }
  160. c.readBody(body)
  161. return nil
  162. }
  163. func (c *v2client) readBody(body io.ReadCloser) ([]byte, error) {
  164. b, err := ioutil.ReadAll(body)
  165. body.Close()
  166. return b, err
  167. }
  168. // put sends server side PUT request.
  169. // It always follows redirects instead of stopping according to RFC 2616.
  170. func (c *v2client) put(urlStr string, body []byte) (*http.Response, error) {
  171. return c.doAlwaysFollowingRedirects("PUT", urlStr, body)
  172. }
  173. func (c *v2client) doAlwaysFollowingRedirects(method string, urlStr string, body []byte) (resp *http.Response, err error) {
  174. var req *http.Request
  175. for redirect := 0; redirect < 10; redirect++ {
  176. req, err = http.NewRequest(method, urlStr, bytes.NewBuffer(body))
  177. if err != nil {
  178. return
  179. }
  180. if resp, err = c.Do(req); err != nil {
  181. if resp != nil {
  182. resp.Body.Close()
  183. }
  184. return
  185. }
  186. if resp.StatusCode == http.StatusMovedPermanently || resp.StatusCode == http.StatusTemporaryRedirect {
  187. resp.Body.Close()
  188. if urlStr = resp.Header.Get("Location"); urlStr == "" {
  189. err = errors.New(fmt.Sprintf("%d response missing Location header", resp.StatusCode))
  190. return
  191. }
  192. continue
  193. }
  194. return
  195. }
  196. err = errors.New("stopped after 10 redirects")
  197. return
  198. }
  199. func (c *v2client) runOne() bool {
  200. c.mu.RLock()
  201. defer c.mu.RUnlock()
  202. if c.stopped {
  203. return false
  204. }
  205. c.wg.Add(1)
  206. return true
  207. }
  208. func (c *v2client) finishOne() {
  209. c.wg.Done()
  210. }
  211. func (c *v2client) stop() {
  212. c.mu.Lock()
  213. c.stopped = true
  214. c.mu.Unlock()
  215. }
  216. func clientError(err error) *etcdErr.Error {
  217. return etcdErr.NewError(etcdErr.EcodeClientInternal, err.Error(), 0)
  218. }