discovery.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package discovery
  14. import (
  15. "errors"
  16. "fmt"
  17. "log"
  18. "net/http"
  19. "net/url"
  20. "os"
  21. "path"
  22. "sort"
  23. "strconv"
  24. "strings"
  25. "time"
  26. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  27. "github.com/coreos/etcd/client"
  28. "github.com/coreos/etcd/pkg/strutil"
  29. )
  30. var (
  31. ErrInvalidURL = errors.New("discovery: invalid URL")
  32. ErrBadSizeKey = errors.New("discovery: size key is bad")
  33. ErrSizeNotFound = errors.New("discovery: size key not found")
  34. ErrTokenNotFound = errors.New("discovery: token not found")
  35. ErrDuplicateID = errors.New("discovery: found duplicate id")
  36. ErrFullCluster = errors.New("discovery: cluster is full")
  37. ErrTooManyRetries = errors.New("discovery: too many retries")
  38. )
  39. const (
  40. // Environment variable used to configure an HTTP proxy for discovery
  41. DiscoveryProxyEnv = "ETCD_DISCOVERY_PROXY"
  42. // Number of retries discovery will attempt before giving up and erroring out.
  43. nRetries = uint(3)
  44. )
  45. type Discoverer interface {
  46. Discover() (string, error)
  47. }
  48. type discovery struct {
  49. cluster string
  50. id uint64
  51. config string
  52. c client.KeysAPI
  53. retries uint
  54. url *url.URL
  55. clock clockwork.Clock
  56. }
  57. // proxyFuncFromEnv builds a proxy function if the appropriate environment
  58. // variable is set. It performs basic sanitization of the environment variable
  59. // and returns any error encountered.
  60. func proxyFuncFromEnv() (func(*http.Request) (*url.URL, error), error) {
  61. proxy := os.Getenv(DiscoveryProxyEnv)
  62. if proxy == "" {
  63. return nil, nil
  64. }
  65. // Do a small amount of URL sanitization to help the user
  66. // Derived from net/http.ProxyFromEnvironment
  67. proxyURL, err := url.Parse(proxy)
  68. if err != nil || !strings.HasPrefix(proxyURL.Scheme, "http") {
  69. // proxy was bogus. Try prepending "http://" to it and
  70. // see if that parses correctly. If not, we ignore the
  71. // error and complain about the original one
  72. var err2 error
  73. proxyURL, err2 = url.Parse("http://" + proxy)
  74. if err2 == nil {
  75. err = nil
  76. }
  77. }
  78. if err != nil {
  79. return nil, fmt.Errorf("invalid proxy address %q: %v", proxy, err)
  80. }
  81. log.Printf("discovery: using proxy %q", proxyURL.String())
  82. return http.ProxyURL(proxyURL), nil
  83. }
  84. func New(durl string, id uint64, config string) (Discoverer, error) {
  85. u, err := url.Parse(durl)
  86. if err != nil {
  87. return nil, err
  88. }
  89. token := u.Path
  90. u.Path = ""
  91. pf, err := proxyFuncFromEnv()
  92. if err != nil {
  93. return nil, err
  94. }
  95. c, err := client.NewDiscoveryKeysAPI(&http.Transport{Proxy: pf}, u.String(), client.DefaultRequestTimeout)
  96. if err != nil {
  97. return nil, err
  98. }
  99. return &discovery{
  100. cluster: token,
  101. id: id,
  102. config: config,
  103. c: c,
  104. url: u,
  105. clock: clockwork.NewRealClock(),
  106. }, nil
  107. }
  108. func (d *discovery) Discover() (string, error) {
  109. // fast path: if the cluster is full, returns the error
  110. // do not need to register itself to the cluster in this
  111. // case.
  112. if _, _, err := d.checkCluster(); err != nil {
  113. return "", err
  114. }
  115. if err := d.createSelf(); err != nil {
  116. // Fails, even on a timeout, if createSelf times out.
  117. // TODO(barakmich): Retrying the same node might want to succeed here
  118. // (ie, createSelf should be idempotent for discovery).
  119. return "", err
  120. }
  121. nodes, size, err := d.checkCluster()
  122. if err != nil {
  123. return "", err
  124. }
  125. all, err := d.waitNodes(nodes, size)
  126. if err != nil {
  127. return "", err
  128. }
  129. return nodesToCluster(all), nil
  130. }
  131. func (d *discovery) createSelf() error {
  132. resp, err := d.c.Create(d.selfKey(), d.config, -1)
  133. if err != nil {
  134. return err
  135. }
  136. // ensure self appears on the server we connected to
  137. w := d.c.Watch(d.selfKey(), resp.Node.CreatedIndex)
  138. _, err = w.Next()
  139. return err
  140. }
  141. func (d *discovery) checkCluster() (client.Nodes, int, error) {
  142. configKey := path.Join("/", d.cluster, "_config")
  143. // find cluster size
  144. resp, err := d.c.Get(path.Join(configKey, "size"))
  145. if err != nil {
  146. if err == client.ErrKeyNoExist {
  147. return nil, 0, ErrSizeNotFound
  148. }
  149. if err == client.ErrTimeout {
  150. return d.checkClusterRetry()
  151. }
  152. return nil, 0, err
  153. }
  154. size, err := strconv.Atoi(resp.Node.Value)
  155. if err != nil {
  156. return nil, 0, ErrBadSizeKey
  157. }
  158. resp, err = d.c.Get(d.cluster)
  159. if err != nil {
  160. if err == client.ErrTimeout {
  161. return d.checkClusterRetry()
  162. }
  163. return nil, 0, err
  164. }
  165. nodes := make(client.Nodes, 0)
  166. // append non-config keys to nodes
  167. for _, n := range resp.Node.Nodes {
  168. if !(path.Base(n.Key) == path.Base(configKey)) {
  169. nodes = append(nodes, n)
  170. }
  171. }
  172. snodes := sortableNodes{nodes}
  173. sort.Sort(snodes)
  174. // find self position
  175. for i := range nodes {
  176. if path.Base(nodes[i].Key) == path.Base(d.selfKey()) {
  177. break
  178. }
  179. if i >= size-1 {
  180. return nil, size, ErrFullCluster
  181. }
  182. }
  183. return nodes, size, nil
  184. }
  185. func (d *discovery) logAndBackoffForRetry(step string) {
  186. d.retries++
  187. retryTime := time.Second * (0x1 << d.retries)
  188. log.Println("discovery: during", step, "connection to", d.url, "timed out, retrying in", retryTime)
  189. d.clock.Sleep(retryTime)
  190. }
  191. func (d *discovery) checkClusterRetry() (client.Nodes, int, error) {
  192. if d.retries < nRetries {
  193. d.logAndBackoffForRetry("cluster status check")
  194. return d.checkCluster()
  195. }
  196. return nil, 0, ErrTooManyRetries
  197. }
  198. func (d *discovery) waitNodesRetry() (client.Nodes, error) {
  199. if d.retries < nRetries {
  200. d.logAndBackoffForRetry("waiting for other nodes")
  201. nodes, n, err := d.checkCluster()
  202. if err != nil {
  203. return nil, err
  204. }
  205. return d.waitNodes(nodes, n)
  206. }
  207. return nil, ErrTooManyRetries
  208. }
  209. func (d *discovery) waitNodes(nodes client.Nodes, size int) (client.Nodes, error) {
  210. if len(nodes) > size {
  211. nodes = nodes[:size]
  212. }
  213. w := d.c.RecursiveWatch(d.cluster, nodes[len(nodes)-1].ModifiedIndex+1)
  214. all := make(client.Nodes, len(nodes))
  215. copy(all, nodes)
  216. for _, n := range all {
  217. if path.Base(n.Key) == path.Base(d.selfKey()) {
  218. log.Printf("discovery: found self %s in the cluster", path.Base(d.selfKey()))
  219. } else {
  220. log.Printf("discovery: found peer %s in the cluster", path.Base(n.Key))
  221. }
  222. }
  223. // wait for others
  224. for len(all) < size {
  225. log.Printf("discovery: found %d peer(s), waiting for %d more", len(all), size-len(all))
  226. resp, err := w.Next()
  227. if err != nil {
  228. if err == client.ErrTimeout {
  229. return d.waitNodesRetry()
  230. }
  231. return nil, err
  232. }
  233. log.Printf("discovery: found peer %s in the cluster", path.Base(resp.Node.Key))
  234. all = append(all, resp.Node)
  235. }
  236. log.Printf("discovery: found %d needed peer(s)", len(all))
  237. return all, nil
  238. }
  239. func (d *discovery) selfKey() string {
  240. return path.Join("/", d.cluster, strutil.IDAsHex(d.id))
  241. }
  242. func nodesToCluster(ns client.Nodes) string {
  243. s := make([]string, len(ns))
  244. for i, n := range ns {
  245. s[i] = n.Value
  246. }
  247. return strings.Join(s, ",")
  248. }
  249. type sortableNodes struct{ client.Nodes }
  250. func (ns sortableNodes) Len() int { return len(ns.Nodes) }
  251. func (ns sortableNodes) Less(i, j int) bool {
  252. return ns.Nodes[i].CreatedIndex < ns.Nodes[j].CreatedIndex
  253. }
  254. func (ns sortableNodes) Swap(i, j int) { ns.Nodes[i], ns.Nodes[j] = ns.Nodes[j], ns.Nodes[i] }