discovery.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package discovery
  14. import (
  15. "errors"
  16. "fmt"
  17. "log"
  18. "net/http"
  19. "net/url"
  20. "path"
  21. "sort"
  22. "strconv"
  23. "strings"
  24. "time"
  25. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  26. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  27. "github.com/coreos/etcd/client"
  28. "github.com/coreos/etcd/pkg/types"
  29. )
  30. var (
  31. ErrInvalidURL = errors.New("discovery: invalid URL")
  32. ErrBadSizeKey = errors.New("discovery: size key is bad")
  33. ErrSizeNotFound = errors.New("discovery: size key not found")
  34. ErrTokenNotFound = errors.New("discovery: token not found")
  35. ErrDuplicateID = errors.New("discovery: found duplicate id")
  36. ErrFullCluster = errors.New("discovery: cluster is full")
  37. ErrTooManyRetries = errors.New("discovery: too many retries")
  38. )
  39. const (
  40. // Number of retries discovery will attempt before giving up and erroring out.
  41. nRetries = uint(3)
  42. )
  43. // JoinCluster will connect to the discovery service at the given url, and
  44. // register the server represented by the given id and config to the cluster
  45. func JoinCluster(durl, dproxyurl string, id types.ID, config string) (string, error) {
  46. d, err := newDiscovery(durl, dproxyurl, id)
  47. if err != nil {
  48. return "", err
  49. }
  50. return d.joinCluster(config)
  51. }
  52. // GetCluster will connect to the discovery service at the given url and
  53. // retrieve a string describing the cluster
  54. func GetCluster(durl, dproxyurl string) (string, error) {
  55. d, err := newDiscovery(durl, dproxyurl, 0)
  56. if err != nil {
  57. return "", err
  58. }
  59. return d.getCluster()
  60. }
  61. type discovery struct {
  62. cluster string
  63. id types.ID
  64. c client.KeysAPI
  65. retries uint
  66. url *url.URL
  67. clock clockwork.Clock
  68. }
  69. // newProxyFunc builds a proxy function from the given string, which should
  70. // represent a URL that can be used as a proxy. It performs basic
  71. // sanitization of the URL and returns any error encountered.
  72. func newProxyFunc(proxy string) (func(*http.Request) (*url.URL, error), error) {
  73. if proxy == "" {
  74. return nil, nil
  75. }
  76. // Do a small amount of URL sanitization to help the user
  77. // Derived from net/http.ProxyFromEnvironment
  78. proxyURL, err := url.Parse(proxy)
  79. if err != nil || !strings.HasPrefix(proxyURL.Scheme, "http") {
  80. // proxy was bogus. Try prepending "http://" to it and
  81. // see if that parses correctly. If not, we ignore the
  82. // error and complain about the original one
  83. var err2 error
  84. proxyURL, err2 = url.Parse("http://" + proxy)
  85. if err2 == nil {
  86. err = nil
  87. }
  88. }
  89. if err != nil {
  90. return nil, fmt.Errorf("invalid proxy address %q: %v", proxy, err)
  91. }
  92. log.Printf("discovery: using proxy %q", proxyURL.String())
  93. return http.ProxyURL(proxyURL), nil
  94. }
  95. func newDiscovery(durl, dproxyurl string, id types.ID) (*discovery, error) {
  96. u, err := url.Parse(durl)
  97. if err != nil {
  98. return nil, err
  99. }
  100. token := u.Path
  101. u.Path = ""
  102. pf, err := newProxyFunc(dproxyurl)
  103. if err != nil {
  104. return nil, err
  105. }
  106. c, err := client.NewHTTPClient(&http.Transport{Proxy: pf}, []string{u.String()})
  107. if err != nil {
  108. return nil, err
  109. }
  110. dc := client.NewDiscoveryKeysAPI(c)
  111. return &discovery{
  112. cluster: token,
  113. c: dc,
  114. id: id,
  115. url: u,
  116. clock: clockwork.NewRealClock(),
  117. }, nil
  118. }
  119. func (d *discovery) joinCluster(config string) (string, error) {
  120. // fast path: if the cluster is full, return the error
  121. // do not need to register to the cluster in this case.
  122. if _, _, err := d.checkCluster(); err != nil {
  123. return "", err
  124. }
  125. if err := d.createSelf(config); err != nil {
  126. // Fails, even on a timeout, if createSelf times out.
  127. // TODO(barakmich): Retrying the same node might want to succeed here
  128. // (ie, createSelf should be idempotent for discovery).
  129. return "", err
  130. }
  131. nodes, size, err := d.checkCluster()
  132. if err != nil {
  133. return "", err
  134. }
  135. all, err := d.waitNodes(nodes, size)
  136. if err != nil {
  137. return "", err
  138. }
  139. return nodesToCluster(all), nil
  140. }
  141. func (d *discovery) getCluster() (string, error) {
  142. nodes, size, err := d.checkCluster()
  143. if err != nil {
  144. if err == ErrFullCluster {
  145. return nodesToCluster(nodes), nil
  146. }
  147. return "", err
  148. }
  149. all, err := d.waitNodes(nodes, size)
  150. if err != nil {
  151. return "", err
  152. }
  153. return nodesToCluster(all), nil
  154. }
  155. func (d *discovery) createSelf(contents string) error {
  156. ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
  157. resp, err := d.c.Create(ctx, d.selfKey(), contents, -1)
  158. cancel()
  159. if err != nil {
  160. return err
  161. }
  162. // ensure self appears on the server we connected to
  163. w := d.c.Watch(d.selfKey(), resp.Node.CreatedIndex)
  164. _, err = w.Next(context.Background())
  165. return err
  166. }
  167. func (d *discovery) checkCluster() (client.Nodes, int, error) {
  168. configKey := path.Join("/", d.cluster, "_config")
  169. ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
  170. // find cluster size
  171. resp, err := d.c.Get(ctx, path.Join(configKey, "size"))
  172. cancel()
  173. if err != nil {
  174. if err == client.ErrKeyNoExist {
  175. return nil, 0, ErrSizeNotFound
  176. }
  177. if err == client.ErrTimeout {
  178. return d.checkClusterRetry()
  179. }
  180. return nil, 0, err
  181. }
  182. size, err := strconv.Atoi(resp.Node.Value)
  183. if err != nil {
  184. return nil, 0, ErrBadSizeKey
  185. }
  186. ctx, cancel = context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
  187. resp, err = d.c.Get(ctx, d.cluster)
  188. cancel()
  189. if err != nil {
  190. if err == client.ErrTimeout {
  191. return d.checkClusterRetry()
  192. }
  193. return nil, 0, err
  194. }
  195. nodes := make(client.Nodes, 0)
  196. // append non-config keys to nodes
  197. for _, n := range resp.Node.Nodes {
  198. if !(path.Base(n.Key) == path.Base(configKey)) {
  199. nodes = append(nodes, n)
  200. }
  201. }
  202. snodes := sortableNodes{nodes}
  203. sort.Sort(snodes)
  204. // find self position
  205. for i := range nodes {
  206. if path.Base(nodes[i].Key) == path.Base(d.selfKey()) {
  207. break
  208. }
  209. if i >= size-1 {
  210. return nodes[:size], size, ErrFullCluster
  211. }
  212. }
  213. return nodes, size, nil
  214. }
  215. func (d *discovery) logAndBackoffForRetry(step string) {
  216. d.retries++
  217. retryTime := time.Second * (0x1 << d.retries)
  218. log.Println("discovery: during", step, "connection to", d.url, "timed out, retrying in", retryTime)
  219. d.clock.Sleep(retryTime)
  220. }
  221. func (d *discovery) checkClusterRetry() (client.Nodes, int, error) {
  222. if d.retries < nRetries {
  223. d.logAndBackoffForRetry("cluster status check")
  224. return d.checkCluster()
  225. }
  226. return nil, 0, ErrTooManyRetries
  227. }
  228. func (d *discovery) waitNodesRetry() (client.Nodes, error) {
  229. if d.retries < nRetries {
  230. d.logAndBackoffForRetry("waiting for other nodes")
  231. nodes, n, err := d.checkCluster()
  232. if err != nil {
  233. return nil, err
  234. }
  235. return d.waitNodes(nodes, n)
  236. }
  237. return nil, ErrTooManyRetries
  238. }
  239. func (d *discovery) waitNodes(nodes client.Nodes, size int) (client.Nodes, error) {
  240. if len(nodes) > size {
  241. nodes = nodes[:size]
  242. }
  243. w := d.c.RecursiveWatch(d.cluster, nodes[len(nodes)-1].ModifiedIndex+1)
  244. all := make(client.Nodes, len(nodes))
  245. copy(all, nodes)
  246. for _, n := range all {
  247. if path.Base(n.Key) == path.Base(d.selfKey()) {
  248. log.Printf("discovery: found self %s in the cluster", path.Base(d.selfKey()))
  249. } else {
  250. log.Printf("discovery: found peer %s in the cluster", path.Base(n.Key))
  251. }
  252. }
  253. // wait for others
  254. for len(all) < size {
  255. log.Printf("discovery: found %d peer(s), waiting for %d more", len(all), size-len(all))
  256. resp, err := w.Next(context.Background())
  257. if err != nil {
  258. if err == client.ErrTimeout {
  259. return d.waitNodesRetry()
  260. }
  261. return nil, err
  262. }
  263. log.Printf("discovery: found peer %s in the cluster", path.Base(resp.Node.Key))
  264. all = append(all, resp.Node)
  265. }
  266. log.Printf("discovery: found %d needed peer(s)", len(all))
  267. return all, nil
  268. }
  269. func (d *discovery) selfKey() string {
  270. return path.Join("/", d.cluster, d.id.String())
  271. }
  272. func nodesToCluster(ns client.Nodes) string {
  273. s := make([]string, len(ns))
  274. for i, n := range ns {
  275. s[i] = n.Value
  276. }
  277. return strings.Join(s, ",")
  278. }
  279. type sortableNodes struct{ client.Nodes }
  280. func (ns sortableNodes) Len() int { return len(ns.Nodes) }
  281. func (ns sortableNodes) Less(i, j int) bool {
  282. return ns.Nodes[i].CreatedIndex < ns.Nodes[j].CreatedIndex
  283. }
  284. func (ns sortableNodes) Swap(i, j int) { ns.Nodes[i], ns.Nodes[j] = ns.Nodes[j], ns.Nodes[i] }