discovery.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package discovery
  2. import (
  3. "errors"
  4. "fmt"
  5. "path"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "github.com/coreos/etcd/client"
  10. "github.com/coreos/etcd/etcdserver/etcdhttp"
  11. )
  12. var (
  13. ErrInvalidURL = errors.New("discovery: invalid URL")
  14. ErrBadSizeKey = errors.New("discovery: size key is bad")
  15. ErrSizeNotFound = errors.New("discovery: size key not found")
  16. ErrTokenNotFound = errors.New("discovery: token not found")
  17. ErrDuplicateID = errors.New("discovery: found duplicate id")
  18. ErrFullCluster = errors.New("discovery: cluster is full")
  19. )
  20. type discovery struct {
  21. cluster string
  22. id int64
  23. ctx []byte
  24. c client.Client
  25. }
  26. func (d *discovery) discover() (*etcdhttp.Peers, error) {
  27. // fast path: if the cluster is full, returns the error
  28. // do not need to register itself to the cluster in this
  29. // case.
  30. if _, _, err := d.checkCluster(); err != nil {
  31. return nil, err
  32. }
  33. if err := d.createSelf(); err != nil {
  34. return nil, err
  35. }
  36. nodes, size, err := d.checkCluster()
  37. if err != nil {
  38. return nil, err
  39. }
  40. all, err := d.waitNodes(nodes, size)
  41. if err != nil {
  42. return nil, err
  43. }
  44. return nodesToPeers(all)
  45. }
  46. func (d *discovery) createSelf() error {
  47. resp, err := d.c.Create(d.selfKey(), string(d.ctx), 0)
  48. if err != nil {
  49. return err
  50. }
  51. // ensure self appears on the server we connected to
  52. w := d.c.Watch(d.selfKey(), resp.Node.CreatedIndex)
  53. _, err = w.Next()
  54. return err
  55. }
  56. func (d *discovery) checkCluster() (client.Nodes, int, error) {
  57. configKey := path.Join("/", d.cluster, "_config")
  58. // find cluster size
  59. resp, err := d.c.Get(path.Join(configKey, "size"))
  60. if err != nil {
  61. if err == client.ErrKeyNoExist {
  62. return nil, 0, ErrSizeNotFound
  63. }
  64. return nil, 0, err
  65. }
  66. size, err := strconv.Atoi(resp.Node.Value)
  67. if err != nil {
  68. return nil, 0, ErrBadSizeKey
  69. }
  70. resp, err = d.c.Get(d.cluster)
  71. if err != nil {
  72. return nil, 0, err
  73. }
  74. nodes := make(client.Nodes, 0)
  75. // append non-config keys to nodes
  76. for _, n := range resp.Node.Nodes {
  77. if !strings.HasPrefix(n.Key, configKey) {
  78. nodes = append(nodes, n)
  79. }
  80. }
  81. snodes := sortableNodes{nodes}
  82. sort.Sort(snodes)
  83. // find self position
  84. for i := range nodes {
  85. if nodes[i].Key == d.selfKey() {
  86. break
  87. }
  88. if i >= size-1 {
  89. return nil, size, ErrFullCluster
  90. }
  91. }
  92. return nodes, size, nil
  93. }
  94. func (d *discovery) waitNodes(nodes client.Nodes, size int) (client.Nodes, error) {
  95. if len(nodes) > size {
  96. nodes = nodes[:size]
  97. }
  98. w := d.c.RecursiveWatch(d.cluster, nodes[len(nodes)-1].ModifiedIndex)
  99. all := make(client.Nodes, len(nodes))
  100. copy(all, nodes)
  101. // wait for others
  102. for len(all) < size {
  103. resp, err := w.Next()
  104. if err != nil {
  105. return nil, err
  106. }
  107. all = append(all, resp.Node)
  108. }
  109. return all, nil
  110. }
  111. func (d *discovery) selfKey() string {
  112. return path.Join("/", d.cluster, fmt.Sprintf("%d", d.id))
  113. }
  114. func nodesToPeers(ns client.Nodes) (*etcdhttp.Peers, error) {
  115. s := make([]string, len(ns))
  116. for i, n := range ns {
  117. s[i] = n.Value
  118. }
  119. var peers etcdhttp.Peers
  120. if err := peers.Set(strings.Join(s, "&")); err != nil {
  121. return nil, err
  122. }
  123. return &peers, nil
  124. }
  125. type sortableNodes struct{ client.Nodes }
  126. func (ns sortableNodes) Len() int { return len(ns.Nodes) }
  127. func (ns sortableNodes) Less(i, j int) bool {
  128. return ns.Nodes[i].CreatedIndex < ns.Nodes[j].CreatedIndex
  129. }
  130. func (ns sortableNodes) Swap(i, j int) { ns.Nodes[i], ns.Nodes[j] = ns.Nodes[j], ns.Nodes[i] }