registry.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. // +build ignore
  2. package server
  3. import (
  4. "fmt"
  5. "net/url"
  6. "path"
  7. "path/filepath"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "github.com/coreos/etcd/log"
  12. "github.com/coreos/etcd/store"
  13. )
  14. // The location of the peer URL data.
  15. const RegistryKey = "/_etcd/machines"
  16. // The Registry stores URL information for nodes.
  17. type Registry struct {
  18. sync.Mutex
  19. store store.Store
  20. peers map[string]*node
  21. }
  22. // The internal storage format of the registry.
  23. type node struct {
  24. peerVersion string
  25. peerURL string
  26. url string
  27. }
  28. // Creates a new Registry.
  29. func NewRegistry(s store.Store) *Registry {
  30. return &Registry{
  31. store: s,
  32. peers: make(map[string]*node),
  33. }
  34. }
  35. // Register adds a peer to the registry.
  36. func (r *Registry) Register(name string, peerURL string, machURL string) error {
  37. // Write data to store.
  38. v := url.Values{}
  39. v.Set("raft", peerURL)
  40. v.Set("etcd", machURL)
  41. log.Debugf("Register: %s", name)
  42. if _, err := r.store.Create(path.Join(RegistryKey, name), false, v.Encode(), false, store.Permanent); err != nil {
  43. return err
  44. }
  45. r.Lock()
  46. defer r.Unlock()
  47. r.peers[name] = r.load(RegistryKey, name)
  48. return nil
  49. }
  50. // Unregister removes a peer from the registry.
  51. func (r *Registry) Unregister(name string) error {
  52. // Remove the key from the store.
  53. log.Debugf("Unregister: %s", name)
  54. _, err := r.store.Delete(path.Join(RegistryKey, name), false, false)
  55. return err
  56. }
  57. // Count returns the number of peers in the cluster.
  58. func (r *Registry) Count() int {
  59. e, err := r.store.Get(RegistryKey, false, false)
  60. if err != nil {
  61. return 0
  62. }
  63. return len(e.Node.Nodes)
  64. }
  65. // Exists checks if a peer with the given name exists.
  66. func (r *Registry) Exists(name string) bool {
  67. e, err := r.store.Get(path.Join(RegistryKey, name), false, false)
  68. if err != nil {
  69. return false
  70. }
  71. return (e.Node != nil)
  72. }
  73. // Retrieves the client URL for a given node by name.
  74. func (r *Registry) ClientURL(name string) (string, bool) {
  75. r.Lock()
  76. defer r.Unlock()
  77. return r.clientURL(RegistryKey, name)
  78. }
  79. func (r *Registry) clientURL(key, name string) (string, bool) {
  80. if r.peers[name] == nil {
  81. if peer := r.load(key, name); peer != nil {
  82. r.peers[name] = peer
  83. }
  84. }
  85. if peer := r.peers[name]; peer != nil {
  86. return peer.url, true
  87. }
  88. return "", false
  89. }
  90. // TODO(yichengq): have all of the code use a full URL with scheme
  91. // and remove this method
  92. // PeerHost retrieves the host part of peer URL for a given node by name.
  93. func (r *Registry) PeerHost(name string) (string, bool) {
  94. rawurl, ok := r.PeerURL(name)
  95. if ok {
  96. u, _ := url.Parse(rawurl)
  97. return u.Host, ok
  98. }
  99. return rawurl, ok
  100. }
  101. // Retrieves the peer URL for a given node by name.
  102. func (r *Registry) PeerURL(name string) (string, bool) {
  103. r.Lock()
  104. defer r.Unlock()
  105. return r.peerURL(RegistryKey, name)
  106. }
  107. func (r *Registry) peerURL(key, name string) (string, bool) {
  108. if r.peers[name] == nil {
  109. if peer := r.load(key, name); peer != nil {
  110. r.peers[name] = peer
  111. }
  112. }
  113. if peer := r.peers[name]; peer != nil {
  114. return peer.peerURL, true
  115. }
  116. return "", false
  117. }
  118. // UpdatePeerURL updates peer URL in registry
  119. func (r *Registry) UpdatePeerURL(name string, peerURL string) error {
  120. machURL, _ := r.clientURL(RegistryKey, name)
  121. // Write data to store.
  122. v := url.Values{}
  123. v.Set("raft", peerURL)
  124. v.Set("etcd", machURL)
  125. log.Debugf("Update PeerURL: %s", name)
  126. if _, err := r.store.Update(path.Join(RegistryKey, name), v.Encode(), store.Permanent); err != nil {
  127. return err
  128. }
  129. r.Lock()
  130. defer r.Unlock()
  131. // Invalidate outdated cache.
  132. r.invalidate(name)
  133. return nil
  134. }
  135. func (r *Registry) name(key, name string) (string, bool) {
  136. return name, true
  137. }
  138. // Names returns a list of cached peer names.
  139. func (r *Registry) Names() []string {
  140. names := r.urls(RegistryKey, "", "", r.name)
  141. sort.Sort(sort.StringSlice(names))
  142. return names
  143. }
  144. // Retrieves the Client URLs for all nodes.
  145. func (r *Registry) ClientURLs(leaderName, selfName string) []string {
  146. return r.urls(RegistryKey, leaderName, selfName, r.clientURL)
  147. }
  148. // Retrieves the Peer URLs for all nodes.
  149. func (r *Registry) PeerURLs(leaderName, selfName string) []string {
  150. return r.urls(RegistryKey, leaderName, selfName, r.peerURL)
  151. }
  152. // Retrieves the URLs for all nodes using url function.
  153. func (r *Registry) urls(key, leaderName, selfName string, url func(key, name string) (string, bool)) []string {
  154. r.Lock()
  155. defer r.Unlock()
  156. // Build list including the leader and self.
  157. urls := make([]string, 0)
  158. if url, _ := url(key, leaderName); len(url) > 0 {
  159. urls = append(urls, url)
  160. }
  161. // Retrieve a list of all nodes.
  162. if e, _ := r.store.Get(key, false, false); e != nil {
  163. // Lookup the URL for each one.
  164. for _, pair := range e.Node.Nodes {
  165. _, name := filepath.Split(pair.Key)
  166. if url, _ := url(key, name); len(url) > 0 && name != leaderName {
  167. urls = append(urls, url)
  168. }
  169. }
  170. }
  171. log.Debugf("URLs: %s: %s / %s (%s)", key, leaderName, selfName, strings.Join(urls, ","))
  172. return urls
  173. }
  174. // Removes a node from the cache.
  175. func (r *Registry) Invalidate(name string) {
  176. r.Lock()
  177. defer r.Unlock()
  178. r.invalidate(name)
  179. }
  180. func (r *Registry) invalidate(name string) {
  181. delete(r.peers, name)
  182. }
  183. // Loads the given node by name from the store into the cache.
  184. func (r *Registry) load(key, name string) *node {
  185. if name == "" {
  186. return nil
  187. }
  188. // Retrieve from store.
  189. e, err := r.store.Get(path.Join(key, name), false, false)
  190. if err != nil {
  191. return nil
  192. }
  193. // Parse as a query string.
  194. m, err := url.ParseQuery(*e.Node.Value)
  195. if err != nil {
  196. panic(fmt.Sprintf("Failed to parse peers entry: %s", name))
  197. }
  198. // Create node.
  199. return &node{
  200. url: m["etcd"][0],
  201. peerURL: m["raft"][0],
  202. }
  203. }