discovery.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package discovery
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/url"
  6. "path"
  7. "strings"
  8. "time"
  9. etcdErr "github.com/coreos/etcd/error"
  10. "github.com/coreos/etcd/log"
  11. "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
  12. )
  13. const (
  14. stateKey = "_state"
  15. startedState = "started"
  16. defaultTTL = 604800 // One week TTL
  17. )
  18. type Discoverer struct {
  19. client *etcd.Client
  20. name string
  21. peer string
  22. prefix string
  23. discoveryURL string
  24. }
  25. var defaultDiscoverer *Discoverer
  26. func init() {
  27. defaultDiscoverer = &Discoverer{}
  28. }
  29. func (d *Discoverer) Do(discoveryURL string, name string, peer string, closeChan <-chan bool, startRoutine func(func())) (peers []string, err error) {
  30. d.name = name
  31. d.peer = peer
  32. d.discoveryURL = discoveryURL
  33. u, err := url.Parse(discoveryURL)
  34. if err != nil {
  35. return
  36. }
  37. // prefix is prepended to all keys for this discovery
  38. d.prefix = strings.TrimPrefix(u.Path, "/v2/keys/")
  39. // keep the old path in case we need to set the KeyPrefix below
  40. oldPath := u.Path
  41. u.Path = ""
  42. // Connect to a scheme://host not a full URL with path
  43. log.Infof("Discovery via %s using prefix %s.", u.String(), d.prefix)
  44. d.client = etcd.NewClient([]string{u.String()})
  45. if !strings.HasPrefix(oldPath, "/v2/keys") {
  46. d.client.SetKeyPrefix("")
  47. }
  48. // Register this machine first and announce that we are a member of
  49. // this cluster
  50. err = d.heartbeat()
  51. if err != nil {
  52. return
  53. }
  54. // Start the very slow heartbeat to the cluster now in anticipation
  55. // that everything is going to go alright now
  56. startRoutine(func() { d.startHeartbeat(closeChan) })
  57. // Attempt to take the leadership role, if there is no error we are it!
  58. resp, err := d.client.Create(path.Join(d.prefix, stateKey), startedState, 0)
  59. // Bail out on unexpected errors
  60. if err != nil {
  61. if clientErr, ok := err.(*etcd.EtcdError); !ok || clientErr.ErrorCode != etcdErr.EcodeNodeExist {
  62. return nil, err
  63. }
  64. }
  65. // If we got a response then the CAS was successful, we are leader
  66. if resp != nil && resp.Node.Value == startedState {
  67. // We are the leader, we have no peers
  68. log.Infof("Discovery _state was empty, so this machine is the initial leader.")
  69. return nil, nil
  70. }
  71. // Fall through to finding the other discovery peers
  72. return d.findPeers()
  73. }
  74. func (d *Discoverer) findPeers() (peers []string, err error) {
  75. resp, err := d.client.Get(path.Join(d.prefix), false, true)
  76. if err != nil {
  77. return nil, err
  78. }
  79. node := resp.Node
  80. if node == nil {
  81. return nil, fmt.Errorf("%s key doesn't exist.", d.prefix)
  82. }
  83. for _, n := range node.Nodes {
  84. // Skip our own entry in the list, there is no point
  85. if strings.HasSuffix(n.Key, "/"+d.name) {
  86. continue
  87. }
  88. peers = append(peers, n.Value)
  89. }
  90. if len(peers) == 0 {
  91. return nil, errors.New("Discovery found an initialized cluster but no reachable peers are registered.")
  92. }
  93. log.Infof("Discovery found peers %v", peers)
  94. return
  95. }
  96. func (d *Discoverer) startHeartbeat(closeChan <-chan bool) {
  97. // In case of errors we should attempt to heartbeat fairly frequently
  98. heartbeatInterval := defaultTTL / 8
  99. ticker := time.NewTicker(time.Second * time.Duration(heartbeatInterval))
  100. defer ticker.Stop()
  101. for {
  102. select {
  103. case <-ticker.C:
  104. err := d.heartbeat()
  105. if err != nil {
  106. log.Warnf("Discovery heartbeat failed: %v", err)
  107. }
  108. case <-closeChan:
  109. return
  110. }
  111. }
  112. }
  113. func (d *Discoverer) heartbeat() error {
  114. _, err := d.client.Set(path.Join(d.prefix, d.name), d.peer, defaultTTL)
  115. return err
  116. }
  117. func Do(discoveryURL string, name string, peer string, closeChan <-chan bool, startRoutine func(func())) ([]string, error) {
  118. return defaultDiscoverer.Do(discoveryURL, name, peer, closeChan, startRoutine)
  119. }