discovery.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package discovery
  14. import (
  15. "errors"
  16. "fmt"
  17. "log"
  18. "net/http"
  19. "net/url"
  20. "os"
  21. "path"
  22. "sort"
  23. "strconv"
  24. "strings"
  25. "time"
  26. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  27. "github.com/coreos/etcd/client"
  28. )
  29. var (
  30. ErrInvalidURL = errors.New("discovery: invalid URL")
  31. ErrBadSizeKey = errors.New("discovery: size key is bad")
  32. ErrSizeNotFound = errors.New("discovery: size key not found")
  33. ErrTokenNotFound = errors.New("discovery: token not found")
  34. ErrDuplicateID = errors.New("discovery: found duplicate id")
  35. ErrFullCluster = errors.New("discovery: cluster is full")
  36. ErrTooManyRetries = errors.New("discovery: too many retries")
  37. )
  38. const (
  39. // Environment variable used to configure an HTTP proxy for discovery
  40. DiscoveryProxyEnv = "ETCD_DISCOVERY_PROXY"
  41. // Number of retries discovery will attempt before giving up and erroring out.
  42. nRetries = uint(3)
  43. )
  44. type Discoverer interface {
  45. Discover() (string, error)
  46. }
  47. type discovery struct {
  48. cluster string
  49. id uint64
  50. config string
  51. c client.Client
  52. retries uint
  53. url *url.URL
  54. clock clockwork.Clock
  55. }
  56. // proxyFuncFromEnv builds a proxy function if the appropriate environment
  57. // variable is set. It performs basic sanitization of the environment variable
  58. // and returns any error encountered.
  59. func proxyFuncFromEnv() (func(*http.Request) (*url.URL, error), error) {
  60. proxy := os.Getenv(DiscoveryProxyEnv)
  61. if proxy == "" {
  62. return nil, nil
  63. }
  64. // Do a small amount of URL sanitization to help the user
  65. // Derived from net/http.ProxyFromEnvironment
  66. proxyURL, err := url.Parse(proxy)
  67. if err != nil || !strings.HasPrefix(proxyURL.Scheme, "http") {
  68. // proxy was bogus. Try prepending "http://" to it and
  69. // see if that parses correctly. If not, we ignore the
  70. // error and complain about the original one
  71. var err2 error
  72. proxyURL, err2 = url.Parse("http://" + proxy)
  73. if err2 == nil {
  74. err = nil
  75. }
  76. }
  77. if err != nil {
  78. return nil, fmt.Errorf("invalid proxy address %q: %v", proxy, err)
  79. }
  80. log.Printf("discovery: using proxy %q", proxyURL.String())
  81. return http.ProxyURL(proxyURL), nil
  82. }
  83. func New(durl string, id uint64, config string) (Discoverer, error) {
  84. u, err := url.Parse(durl)
  85. if err != nil {
  86. return nil, err
  87. }
  88. token := u.Path
  89. u.Path = ""
  90. pf, err := proxyFuncFromEnv()
  91. if err != nil {
  92. return nil, err
  93. }
  94. c, err := client.NewHTTPClient(&http.Transport{Proxy: pf}, u.String(), time.Second*5)
  95. if err != nil {
  96. return nil, err
  97. }
  98. // discovery service redirects /[key] to /v2/keys/[key]
  99. // set the prefix of client to "" to handle this
  100. c.SetPrefix("")
  101. return &discovery{
  102. cluster: token,
  103. id: id,
  104. config: config,
  105. c: c,
  106. url: u,
  107. clock: clockwork.NewRealClock(),
  108. }, nil
  109. }
  110. func (d *discovery) Discover() (string, error) {
  111. // fast path: if the cluster is full, returns the error
  112. // do not need to register itself to the cluster in this
  113. // case.
  114. if _, _, err := d.checkCluster(); err != nil {
  115. return "", err
  116. }
  117. if err := d.createSelf(); err != nil {
  118. // Fails, even on a timeout, if createSelf times out.
  119. // TODO(barakmich): Retrying the same node might want to succeed here
  120. // (ie, createSelf should be idempotent for discovery).
  121. return "", err
  122. }
  123. nodes, size, err := d.checkCluster()
  124. if err != nil {
  125. return "", err
  126. }
  127. all, err := d.waitNodes(nodes, size)
  128. if err != nil {
  129. return "", err
  130. }
  131. return nodesToCluster(all), nil
  132. }
  133. func (d *discovery) createSelf() error {
  134. resp, err := d.c.Create(d.selfKey(), d.config, -1)
  135. if err != nil {
  136. return err
  137. }
  138. // ensure self appears on the server we connected to
  139. w := d.c.Watch(d.selfKey(), resp.Node.CreatedIndex)
  140. _, err = w.Next()
  141. return err
  142. }
  143. func (d *discovery) checkCluster() (client.Nodes, int, error) {
  144. configKey := path.Join("/", d.cluster, "_config")
  145. // find cluster size
  146. resp, err := d.c.Get(path.Join(configKey, "size"))
  147. if err != nil {
  148. if err == client.ErrKeyNoExist {
  149. return nil, 0, ErrSizeNotFound
  150. }
  151. if err == client.ErrTimeout {
  152. return d.checkClusterRetry()
  153. }
  154. return nil, 0, err
  155. }
  156. size, err := strconv.Atoi(resp.Node.Value)
  157. if err != nil {
  158. return nil, 0, ErrBadSizeKey
  159. }
  160. resp, err = d.c.Get(d.cluster)
  161. if err != nil {
  162. if err == client.ErrTimeout {
  163. return d.checkClusterRetry()
  164. }
  165. return nil, 0, err
  166. }
  167. nodes := make(client.Nodes, 0)
  168. // append non-config keys to nodes
  169. for _, n := range resp.Node.Nodes {
  170. if !strings.Contains(n.Key, configKey) {
  171. nodes = append(nodes, n)
  172. }
  173. }
  174. snodes := sortableNodes{nodes}
  175. sort.Sort(snodes)
  176. // find self position
  177. for i := range nodes {
  178. if strings.Contains(nodes[i].Key, d.selfKey()) {
  179. break
  180. }
  181. if i >= size-1 {
  182. return nil, size, ErrFullCluster
  183. }
  184. }
  185. return nodes, size, nil
  186. }
  187. func (d *discovery) logAndBackoffForRetry(step string) {
  188. d.retries++
  189. retryTime := time.Second * (0x1 << d.retries)
  190. log.Println("discovery: during", step, "connection to", d.url, "timed out, retrying in", retryTime)
  191. d.clock.Sleep(retryTime)
  192. }
  193. func (d *discovery) checkClusterRetry() (client.Nodes, int, error) {
  194. if d.retries < nRetries {
  195. d.logAndBackoffForRetry("cluster status check")
  196. return d.checkCluster()
  197. }
  198. return nil, 0, ErrTooManyRetries
  199. }
  200. func (d *discovery) waitNodesRetry() (client.Nodes, error) {
  201. if d.retries < nRetries {
  202. d.logAndBackoffForRetry("waiting for other nodes")
  203. nodes, n, err := d.checkCluster()
  204. if err != nil {
  205. return nil, err
  206. }
  207. return d.waitNodes(nodes, n)
  208. }
  209. return nil, ErrTooManyRetries
  210. }
  211. func (d *discovery) waitNodes(nodes client.Nodes, size int) (client.Nodes, error) {
  212. if len(nodes) > size {
  213. nodes = nodes[:size]
  214. }
  215. w := d.c.RecursiveWatch(d.cluster, nodes[len(nodes)-1].ModifiedIndex+1)
  216. all := make(client.Nodes, len(nodes))
  217. copy(all, nodes)
  218. // wait for others
  219. for len(all) < size {
  220. resp, err := w.Next()
  221. if err != nil {
  222. if err == client.ErrTimeout {
  223. return d.waitNodesRetry()
  224. }
  225. return nil, err
  226. }
  227. all = append(all, resp.Node)
  228. }
  229. return all, nil
  230. }
  231. func (d *discovery) selfKey() string {
  232. return path.Join("/", d.cluster, fmt.Sprintf("%d", d.id))
  233. }
  234. func nodesToCluster(ns client.Nodes) string {
  235. s := make([]string, len(ns))
  236. for i, n := range ns {
  237. s[i] = n.Value
  238. }
  239. return strings.Join(s, ",")
  240. }
  241. type sortableNodes struct{ client.Nodes }
  242. func (ns sortableNodes) Len() int { return len(ns.Nodes) }
  243. func (ns sortableNodes) Less(i, j int) bool {
  244. return ns.Nodes[i].CreatedIndex < ns.Nodes[j].CreatedIndex
  245. }
  246. func (ns sortableNodes) Swap(i, j int) { ns.Nodes[i], ns.Nodes[j] = ns.Nodes[j], ns.Nodes[i] }