discovery.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package discovery
  14. import (
  15. "errors"
  16. "fmt"
  17. "log"
  18. "net/http"
  19. "net/url"
  20. "os"
  21. "path"
  22. "sort"
  23. "strconv"
  24. "strings"
  25. "time"
  26. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  27. "github.com/coreos/etcd/client"
  28. )
  29. var (
  30. ErrInvalidURL = errors.New("discovery: invalid URL")
  31. ErrBadSizeKey = errors.New("discovery: size key is bad")
  32. ErrSizeNotFound = errors.New("discovery: size key not found")
  33. ErrTokenNotFound = errors.New("discovery: token not found")
  34. ErrDuplicateID = errors.New("discovery: found duplicate id")
  35. ErrFullCluster = errors.New("discovery: cluster is full")
  36. ErrTooManyRetries = errors.New("discovery: too many retries")
  37. )
  38. const (
  39. // Environment variable used to configure an HTTP proxy for discovery
  40. DiscoveryProxyEnv = "ETCD_DISCOVERY_PROXY"
  41. // Number of retries discovery will attempt before giving up and erroring out.
  42. nRetries = uint(3)
  43. )
  44. type Discoverer interface {
  45. Discover() (string, error)
  46. }
  47. type discovery struct {
  48. cluster string
  49. id uint64
  50. config string
  51. c client.KeysAPI
  52. retries uint
  53. url *url.URL
  54. clock clockwork.Clock
  55. }
  56. // proxyFuncFromEnv builds a proxy function if the appropriate environment
  57. // variable is set. It performs basic sanitization of the environment variable
  58. // and returns any error encountered.
  59. func proxyFuncFromEnv() (func(*http.Request) (*url.URL, error), error) {
  60. proxy := os.Getenv(DiscoveryProxyEnv)
  61. if proxy == "" {
  62. return nil, nil
  63. }
  64. // Do a small amount of URL sanitization to help the user
  65. // Derived from net/http.ProxyFromEnvironment
  66. proxyURL, err := url.Parse(proxy)
  67. if err != nil || !strings.HasPrefix(proxyURL.Scheme, "http") {
  68. // proxy was bogus. Try prepending "http://" to it and
  69. // see if that parses correctly. If not, we ignore the
  70. // error and complain about the original one
  71. var err2 error
  72. proxyURL, err2 = url.Parse("http://" + proxy)
  73. if err2 == nil {
  74. err = nil
  75. }
  76. }
  77. if err != nil {
  78. return nil, fmt.Errorf("invalid proxy address %q: %v", proxy, err)
  79. }
  80. log.Printf("discovery: using proxy %q", proxyURL.String())
  81. return http.ProxyURL(proxyURL), nil
  82. }
  83. func New(durl string, id uint64, config string) (Discoverer, error) {
  84. u, err := url.Parse(durl)
  85. if err != nil {
  86. return nil, err
  87. }
  88. token := u.Path
  89. u.Path = ""
  90. pf, err := proxyFuncFromEnv()
  91. if err != nil {
  92. return nil, err
  93. }
  94. c, err := client.NewDiscoveryKeysAPI(&http.Transport{Proxy: pf}, u.String(), client.DefaultRequestTimeout)
  95. if err != nil {
  96. return nil, err
  97. }
  98. return &discovery{
  99. cluster: token,
  100. id: id,
  101. config: config,
  102. c: c,
  103. url: u,
  104. clock: clockwork.NewRealClock(),
  105. }, nil
  106. }
  107. func (d *discovery) Discover() (string, error) {
  108. // fast path: if the cluster is full, returns the error
  109. // do not need to register itself to the cluster in this
  110. // case.
  111. if _, _, err := d.checkCluster(); err != nil {
  112. return "", err
  113. }
  114. if err := d.createSelf(); err != nil {
  115. // Fails, even on a timeout, if createSelf times out.
  116. // TODO(barakmich): Retrying the same node might want to succeed here
  117. // (ie, createSelf should be idempotent for discovery).
  118. return "", err
  119. }
  120. nodes, size, err := d.checkCluster()
  121. if err != nil {
  122. return "", err
  123. }
  124. all, err := d.waitNodes(nodes, size)
  125. if err != nil {
  126. return "", err
  127. }
  128. return nodesToCluster(all), nil
  129. }
  130. func (d *discovery) createSelf() error {
  131. resp, err := d.c.Create(d.selfKey(), d.config, -1)
  132. if err != nil {
  133. return err
  134. }
  135. // ensure self appears on the server we connected to
  136. w := d.c.Watch(d.selfKey(), resp.Node.CreatedIndex)
  137. _, err = w.Next()
  138. return err
  139. }
  140. func (d *discovery) checkCluster() (client.Nodes, int, error) {
  141. configKey := path.Join("/", d.cluster, "_config")
  142. // find cluster size
  143. resp, err := d.c.Get(path.Join(configKey, "size"))
  144. if err != nil {
  145. if err == client.ErrKeyNoExist {
  146. return nil, 0, ErrSizeNotFound
  147. }
  148. if err == client.ErrTimeout {
  149. return d.checkClusterRetry()
  150. }
  151. return nil, 0, err
  152. }
  153. size, err := strconv.Atoi(resp.Node.Value)
  154. if err != nil {
  155. return nil, 0, ErrBadSizeKey
  156. }
  157. resp, err = d.c.Get(d.cluster)
  158. if err != nil {
  159. if err == client.ErrTimeout {
  160. return d.checkClusterRetry()
  161. }
  162. return nil, 0, err
  163. }
  164. nodes := make(client.Nodes, 0)
  165. // append non-config keys to nodes
  166. for _, n := range resp.Node.Nodes {
  167. if !strings.Contains(n.Key, configKey) {
  168. nodes = append(nodes, n)
  169. }
  170. }
  171. snodes := sortableNodes{nodes}
  172. sort.Sort(snodes)
  173. // find self position
  174. for i := range nodes {
  175. if strings.Contains(nodes[i].Key, d.selfKey()) {
  176. break
  177. }
  178. if i >= size-1 {
  179. return nil, size, ErrFullCluster
  180. }
  181. }
  182. return nodes, size, nil
  183. }
  184. func (d *discovery) logAndBackoffForRetry(step string) {
  185. d.retries++
  186. retryTime := time.Second * (0x1 << d.retries)
  187. log.Println("discovery: during", step, "connection to", d.url, "timed out, retrying in", retryTime)
  188. d.clock.Sleep(retryTime)
  189. }
  190. func (d *discovery) checkClusterRetry() (client.Nodes, int, error) {
  191. if d.retries < nRetries {
  192. d.logAndBackoffForRetry("cluster status check")
  193. return d.checkCluster()
  194. }
  195. return nil, 0, ErrTooManyRetries
  196. }
  197. func (d *discovery) waitNodesRetry() (client.Nodes, error) {
  198. if d.retries < nRetries {
  199. d.logAndBackoffForRetry("waiting for other nodes")
  200. nodes, n, err := d.checkCluster()
  201. if err != nil {
  202. return nil, err
  203. }
  204. return d.waitNodes(nodes, n)
  205. }
  206. return nil, ErrTooManyRetries
  207. }
  208. func (d *discovery) waitNodes(nodes client.Nodes, size int) (client.Nodes, error) {
  209. if len(nodes) > size {
  210. nodes = nodes[:size]
  211. }
  212. w := d.c.RecursiveWatch(d.cluster, nodes[len(nodes)-1].ModifiedIndex+1)
  213. all := make(client.Nodes, len(nodes))
  214. copy(all, nodes)
  215. // wait for others
  216. for len(all) < size {
  217. resp, err := w.Next()
  218. if err != nil {
  219. if err == client.ErrTimeout {
  220. return d.waitNodesRetry()
  221. }
  222. return nil, err
  223. }
  224. all = append(all, resp.Node)
  225. }
  226. return all, nil
  227. }
  228. func (d *discovery) selfKey() string {
  229. return path.Join("/", d.cluster, fmt.Sprintf("%d", d.id))
  230. }
  231. func nodesToCluster(ns client.Nodes) string {
  232. s := make([]string, len(ns))
  233. for i, n := range ns {
  234. s[i] = n.Value
  235. }
  236. return strings.Join(s, ",")
  237. }
  238. type sortableNodes struct{ client.Nodes }
  239. func (ns sortableNodes) Len() int { return len(ns.Nodes) }
  240. func (ns sortableNodes) Less(i, j int) bool {
  241. return ns.Nodes[i].CreatedIndex < ns.Nodes[j].CreatedIndex
  242. }
  243. func (ns sortableNodes) Swap(i, j int) { ns.Nodes[i], ns.Nodes[j] = ns.Nodes[j], ns.Nodes[i] }