discovery.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdserver
  14. import (
  15. "errors"
  16. "fmt"
  17. "log"
  18. "net/url"
  19. "path"
  20. "strings"
  21. "time"
  22. etcdErr "github.com/coreos/etcd/error"
  23. "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
  24. )
  25. const (
  26. stateKey = "_state"
  27. startedState = "started"
  28. defaultTTL = 604800 // One week TTL
  29. )
  30. type discoverer struct {
  31. client *etcd.Client
  32. name string
  33. addr string
  34. prefix string
  35. }
  36. func newDiscoverer(u *url.URL, name, raftPubAddr string) *discoverer {
  37. d := &discoverer{name: name, addr: raftPubAddr}
  38. // prefix is prepended to all keys for this discovery
  39. d.prefix = strings.TrimPrefix(u.Path, "/v2/keys/")
  40. // keep the old path in case we need to set the KeyPrefix below
  41. oldPath := u.Path
  42. u.Path = ""
  43. // Connect to a scheme://host not a full URL with path
  44. d.client = etcd.NewClient([]string{u.String()})
  45. if !strings.HasPrefix(oldPath, "/v2/keys") {
  46. d.client.SetKeyPrefix("")
  47. }
  48. return d
  49. }
  50. func (d *discoverer) discover() ([]string, error) {
  51. log.Printf("discoverer name=%s target=\"%q\" prefix=%s\n", d.name, d.client.GetCluster(), d.prefix)
  52. if _, err := d.client.Set(path.Join(d.prefix, d.name), d.addr, defaultTTL); err != nil {
  53. return nil, fmt.Errorf("discovery service error: %v", err)
  54. }
  55. // Attempt to take the leadership role, if there is no error we are it!
  56. resp, err := d.client.Create(path.Join(d.prefix, stateKey), startedState, 0)
  57. // Bail out on unexpected errors
  58. if err != nil {
  59. if clientErr, ok := err.(*etcd.EtcdError); !ok || clientErr.ErrorCode != etcdErr.EcodeNodeExist {
  60. return nil, fmt.Errorf("discovery service error: %v", err)
  61. }
  62. }
  63. // If we got a response then the CAS was successful, we are leader
  64. if resp != nil && resp.Node.Value == startedState {
  65. // We are the leader, we have no peers
  66. return nil, nil
  67. }
  68. // Fall through to finding the other discovery peers
  69. return d.findPeers()
  70. }
  71. func (d *discoverer) findPeers() (peers []string, err error) {
  72. resp, err := d.client.Get(path.Join(d.prefix), false, true)
  73. if err != nil {
  74. return nil, fmt.Errorf("discovery service error: %v", err)
  75. }
  76. node := resp.Node
  77. if node == nil {
  78. return nil, fmt.Errorf("%s key doesn't exist.", d.prefix)
  79. }
  80. for _, n := range node.Nodes {
  81. // Skip our own entry in the list, there is no point
  82. if strings.HasSuffix(n.Key, "/"+d.name) {
  83. continue
  84. }
  85. peers = append(peers, n.Value)
  86. }
  87. if len(peers) == 0 {
  88. return nil, errors.New("Discovery found an initialized cluster but no reachable peers are registered.")
  89. }
  90. return
  91. }
  92. func (d *discoverer) heartbeat(stopc <-chan struct{}) {
  93. // In case of errors we should attempt to heartbeat fairly frequently
  94. heartbeatInterval := defaultTTL / 8
  95. ticker := time.NewTicker(time.Second * time.Duration(heartbeatInterval))
  96. defer ticker.Stop()
  97. for {
  98. if _, err := d.client.Set(path.Join(d.prefix, d.name), d.addr, defaultTTL); err != nil {
  99. log.Println("discoverer heartbeatErr=\"%v\"", err)
  100. }
  101. select {
  102. case <-ticker.C:
  103. case <-stopc:
  104. return
  105. }
  106. }
  107. }