discovery.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package discovery
  14. import (
  15. "errors"
  16. "fmt"
  17. "log"
  18. "math"
  19. "net/http"
  20. "net/url"
  21. "path"
  22. "sort"
  23. "strconv"
  24. "strings"
  25. "time"
  26. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  27. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  28. "github.com/coreos/etcd/client"
  29. "github.com/coreos/etcd/pkg/types"
  30. )
  31. var (
  32. ErrInvalidURL = errors.New("discovery: invalid URL")
  33. ErrBadSizeKey = errors.New("discovery: size key is bad")
  34. ErrSizeNotFound = errors.New("discovery: size key not found")
  35. ErrTokenNotFound = errors.New("discovery: token not found")
  36. ErrDuplicateID = errors.New("discovery: found duplicate id")
  37. ErrFullCluster = errors.New("discovery: cluster is full")
  38. ErrTooManyRetries = errors.New("discovery: too many retries")
  39. )
  40. var (
  41. // Number of retries discovery will attempt before giving up and erroring out.
  42. nRetries = uint(math.MaxUint32)
  43. )
  44. // JoinCluster will connect to the discovery service at the given url, and
  45. // register the server represented by the given id and config to the cluster
  46. func JoinCluster(durl, dproxyurl string, id types.ID, config string) (string, error) {
  47. d, err := newDiscovery(durl, dproxyurl, id)
  48. if err != nil {
  49. return "", err
  50. }
  51. return d.joinCluster(config)
  52. }
  53. // GetCluster will connect to the discovery service at the given url and
  54. // retrieve a string describing the cluster
  55. func GetCluster(durl, dproxyurl string) (string, error) {
  56. d, err := newDiscovery(durl, dproxyurl, 0)
  57. if err != nil {
  58. return "", err
  59. }
  60. return d.getCluster()
  61. }
  62. type discovery struct {
  63. cluster string
  64. id types.ID
  65. c client.KeysAPI
  66. retries uint
  67. url *url.URL
  68. clock clockwork.Clock
  69. }
  70. // newProxyFunc builds a proxy function from the given string, which should
  71. // represent a URL that can be used as a proxy. It performs basic
  72. // sanitization of the URL and returns any error encountered.
  73. func newProxyFunc(proxy string) (func(*http.Request) (*url.URL, error), error) {
  74. if proxy == "" {
  75. return nil, nil
  76. }
  77. // Do a small amount of URL sanitization to help the user
  78. // Derived from net/http.ProxyFromEnvironment
  79. proxyURL, err := url.Parse(proxy)
  80. if err != nil || !strings.HasPrefix(proxyURL.Scheme, "http") {
  81. // proxy was bogus. Try prepending "http://" to it and
  82. // see if that parses correctly. If not, we ignore the
  83. // error and complain about the original one
  84. var err2 error
  85. proxyURL, err2 = url.Parse("http://" + proxy)
  86. if err2 == nil {
  87. err = nil
  88. }
  89. }
  90. if err != nil {
  91. return nil, fmt.Errorf("invalid proxy address %q: %v", proxy, err)
  92. }
  93. log.Printf("discovery: using proxy %q", proxyURL.String())
  94. return http.ProxyURL(proxyURL), nil
  95. }
  96. func newDiscovery(durl, dproxyurl string, id types.ID) (*discovery, error) {
  97. u, err := url.Parse(durl)
  98. if err != nil {
  99. return nil, err
  100. }
  101. token := u.Path
  102. u.Path = ""
  103. pf, err := newProxyFunc(dproxyurl)
  104. if err != nil {
  105. return nil, err
  106. }
  107. c, err := client.NewHTTPClient(&http.Transport{Proxy: pf}, []string{u.String()})
  108. if err != nil {
  109. return nil, err
  110. }
  111. dc := client.NewDiscoveryKeysAPI(c)
  112. return &discovery{
  113. cluster: token,
  114. c: dc,
  115. id: id,
  116. url: u,
  117. clock: clockwork.NewRealClock(),
  118. }, nil
  119. }
  120. func (d *discovery) joinCluster(config string) (string, error) {
  121. // fast path: if the cluster is full, return the error
  122. // do not need to register to the cluster in this case.
  123. if _, _, _, err := d.checkCluster(); err != nil {
  124. return "", err
  125. }
  126. if err := d.createSelf(config); err != nil {
  127. // Fails, even on a timeout, if createSelf times out.
  128. // TODO(barakmich): Retrying the same node might want to succeed here
  129. // (ie, createSelf should be idempotent for discovery).
  130. return "", err
  131. }
  132. nodes, size, index, err := d.checkCluster()
  133. if err != nil {
  134. return "", err
  135. }
  136. all, err := d.waitNodes(nodes, size, index)
  137. if err != nil {
  138. return "", err
  139. }
  140. return nodesToCluster(all), nil
  141. }
  142. func (d *discovery) getCluster() (string, error) {
  143. nodes, size, index, err := d.checkCluster()
  144. if err != nil {
  145. if err == ErrFullCluster {
  146. return nodesToCluster(nodes), nil
  147. }
  148. return "", err
  149. }
  150. all, err := d.waitNodes(nodes, size, index)
  151. if err != nil {
  152. return "", err
  153. }
  154. return nodesToCluster(all), nil
  155. }
  156. func (d *discovery) createSelf(contents string) error {
  157. ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
  158. resp, err := d.c.Create(ctx, d.selfKey(), contents, -1)
  159. cancel()
  160. if err != nil {
  161. if err == client.ErrKeyExists {
  162. return ErrDuplicateID
  163. }
  164. return err
  165. }
  166. // ensure self appears on the server we connected to
  167. w := d.c.Watch(d.selfKey(), resp.Node.CreatedIndex)
  168. _, err = w.Next(context.Background())
  169. return err
  170. }
  171. func (d *discovery) checkCluster() (client.Nodes, int, uint64, error) {
  172. configKey := path.Join("/", d.cluster, "_config")
  173. ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
  174. // find cluster size
  175. resp, err := d.c.Get(ctx, path.Join(configKey, "size"))
  176. cancel()
  177. if err != nil {
  178. if err == client.ErrKeyNoExist {
  179. return nil, 0, 0, ErrSizeNotFound
  180. }
  181. if err == client.ErrTimeout {
  182. return d.checkClusterRetry()
  183. }
  184. return nil, 0, 0, err
  185. }
  186. size, err := strconv.Atoi(resp.Node.Value)
  187. if err != nil {
  188. return nil, 0, 0, ErrBadSizeKey
  189. }
  190. ctx, cancel = context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
  191. resp, err = d.c.Get(ctx, d.cluster)
  192. cancel()
  193. if err != nil {
  194. if err == client.ErrTimeout {
  195. return d.checkClusterRetry()
  196. }
  197. return nil, 0, 0, err
  198. }
  199. nodes := make(client.Nodes, 0)
  200. // append non-config keys to nodes
  201. for _, n := range resp.Node.Nodes {
  202. if !(path.Base(n.Key) == path.Base(configKey)) {
  203. nodes = append(nodes, n)
  204. }
  205. }
  206. snodes := sortableNodes{nodes}
  207. sort.Sort(snodes)
  208. // find self position
  209. for i := range nodes {
  210. if path.Base(nodes[i].Key) == path.Base(d.selfKey()) {
  211. break
  212. }
  213. if i >= size-1 {
  214. return nodes[:size], size, resp.Index, ErrFullCluster
  215. }
  216. }
  217. return nodes, size, resp.Index, nil
  218. }
  219. func (d *discovery) logAndBackoffForRetry(step string) {
  220. d.retries++
  221. retryTime := time.Second * (0x1 << d.retries)
  222. log.Println("discovery: during", step, "connection to", d.url, "timed out, retrying in", retryTime)
  223. d.clock.Sleep(retryTime)
  224. }
  225. func (d *discovery) checkClusterRetry() (client.Nodes, int, uint64, error) {
  226. if d.retries < nRetries {
  227. d.logAndBackoffForRetry("cluster status check")
  228. return d.checkCluster()
  229. }
  230. return nil, 0, 0, ErrTooManyRetries
  231. }
  232. func (d *discovery) waitNodesRetry() (client.Nodes, error) {
  233. if d.retries < nRetries {
  234. d.logAndBackoffForRetry("waiting for other nodes")
  235. nodes, n, index, err := d.checkCluster()
  236. if err != nil {
  237. return nil, err
  238. }
  239. return d.waitNodes(nodes, n, index)
  240. }
  241. return nil, ErrTooManyRetries
  242. }
  243. func (d *discovery) waitNodes(nodes client.Nodes, size int, index uint64) (client.Nodes, error) {
  244. if len(nodes) > size {
  245. nodes = nodes[:size]
  246. }
  247. // watch from the next index
  248. w := d.c.RecursiveWatch(d.cluster, index+1)
  249. all := make(client.Nodes, len(nodes))
  250. copy(all, nodes)
  251. for _, n := range all {
  252. if path.Base(n.Key) == path.Base(d.selfKey()) {
  253. log.Printf("discovery: found self %s in the cluster", path.Base(d.selfKey()))
  254. } else {
  255. log.Printf("discovery: found peer %s in the cluster", path.Base(n.Key))
  256. }
  257. }
  258. // wait for others
  259. for len(all) < size {
  260. log.Printf("discovery: found %d peer(s), waiting for %d more", len(all), size-len(all))
  261. resp, err := w.Next(context.Background())
  262. if err != nil {
  263. if err == client.ErrTimeout {
  264. return d.waitNodesRetry()
  265. }
  266. return nil, err
  267. }
  268. log.Printf("discovery: found peer %s in the cluster", path.Base(resp.Node.Key))
  269. all = append(all, resp.Node)
  270. }
  271. log.Printf("discovery: found %d needed peer(s)", len(all))
  272. return all, nil
  273. }
  274. func (d *discovery) selfKey() string {
  275. return path.Join("/", d.cluster, d.id.String())
  276. }
  277. func nodesToCluster(ns client.Nodes) string {
  278. s := make([]string, len(ns))
  279. for i, n := range ns {
  280. s[i] = n.Value
  281. }
  282. return strings.Join(s, ",")
  283. }
  284. type sortableNodes struct{ client.Nodes }
  285. func (ns sortableNodes) Len() int { return len(ns.Nodes) }
  286. func (ns sortableNodes) Less(i, j int) bool {
  287. return ns.Nodes[i].CreatedIndex < ns.Nodes[j].CreatedIndex
  288. }
  289. func (ns sortableNodes) Swap(i, j int) { ns.Nodes[i], ns.Nodes[j] = ns.Nodes[j], ns.Nodes[i] }