cluster.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdserver
  15. import (
  16. "crypto/sha1"
  17. "encoding/binary"
  18. "encoding/json"
  19. "fmt"
  20. "log"
  21. "net/url"
  22. "path"
  23. "sort"
  24. "strings"
  25. "sync"
  26. "github.com/coreos/etcd/pkg/flags"
  27. "github.com/coreos/etcd/pkg/netutil"
  28. "github.com/coreos/etcd/pkg/types"
  29. "github.com/coreos/etcd/raft/raftpb"
  30. "github.com/coreos/etcd/store"
  31. )
  32. const (
  33. raftAttributesSuffix = "raftAttributes"
  34. attributesSuffix = "attributes"
  35. )
  36. type ClusterInfo interface {
  37. // ID returns the cluster ID
  38. ID() types.ID
  39. // ClientURLs returns an aggregate set of all URLs on which this
  40. // cluster is listening for client requests
  41. ClientURLs() []string
  42. // Members returns a slice of members sorted by their ID
  43. Members() []*Member
  44. // Member retrieves a particular member based on ID, or nil if the
  45. // member does not exist in the cluster
  46. Member(id types.ID) *Member
  47. // IsIDRemoved checks whether the given ID has been removed from this
  48. // cluster at some point in the past
  49. IsIDRemoved(id types.ID) bool
  50. }
  51. // Cluster is a list of Members that belong to the same raft cluster
  52. type Cluster struct {
  53. id types.ID
  54. token string
  55. store store.Store
  56. sync.Mutex // guards members and removed map
  57. members map[types.ID]*Member
  58. // removed contains the ids of removed members in the cluster.
  59. // removed id cannot be reused.
  60. removed map[types.ID]bool
  61. }
  62. // NewClusterFromString returns a Cluster instantiated from the given cluster token
  63. // and cluster string, by parsing members from a set of discovery-formatted
  64. // names-to-IPs, like:
  65. // mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach1=http://3.3.3.3,mach2=http://4.4.4.4
  66. func NewClusterFromString(token string, cluster string) (*Cluster, error) {
  67. c := newCluster(token)
  68. v, err := url.ParseQuery(strings.Replace(cluster, ",", "&", -1))
  69. if err != nil {
  70. return nil, err
  71. }
  72. for name, urls := range v {
  73. if len(urls) == 0 || urls[0] == "" {
  74. return nil, fmt.Errorf("Empty URL given for %q", name)
  75. }
  76. purls := &flags.URLsValue{}
  77. if err := purls.Set(strings.Join(urls, ",")); err != nil {
  78. return nil, err
  79. }
  80. m := NewMember(name, types.URLs(*purls), c.token, nil)
  81. if _, ok := c.members[m.ID]; ok {
  82. return nil, fmt.Errorf("Member exists with identical ID %v", m)
  83. }
  84. c.members[m.ID] = m
  85. }
  86. c.genID()
  87. return c, nil
  88. }
  89. func NewClusterFromStore(token string, st store.Store) *Cluster {
  90. c := newCluster(token)
  91. c.store = st
  92. c.members, c.removed = membersFromStore(c.store)
  93. return c
  94. }
  95. func NewClusterFromMembers(token string, id types.ID, membs []*Member) *Cluster {
  96. c := newCluster(token)
  97. c.id = id
  98. for _, m := range membs {
  99. c.members[m.ID] = m
  100. }
  101. return c
  102. }
  103. func newCluster(token string) *Cluster {
  104. return &Cluster{
  105. token: token,
  106. members: make(map[types.ID]*Member),
  107. removed: make(map[types.ID]bool),
  108. }
  109. }
  110. func (c *Cluster) ID() types.ID { return c.id }
  111. func (c *Cluster) Members() []*Member {
  112. c.Lock()
  113. defer c.Unlock()
  114. var sms SortableMemberSlice
  115. for _, m := range c.members {
  116. sms = append(sms, m.Clone())
  117. }
  118. sort.Sort(sms)
  119. return []*Member(sms)
  120. }
  121. func (c *Cluster) Member(id types.ID) *Member {
  122. c.Lock()
  123. defer c.Unlock()
  124. return c.members[id].Clone()
  125. }
  126. // MemberByName returns a Member with the given name if exists.
  127. // If more than one member has the given name, it will panic.
  128. func (c *Cluster) MemberByName(name string) *Member {
  129. c.Lock()
  130. defer c.Unlock()
  131. var memb *Member
  132. for _, m := range c.members {
  133. if m.Name == name {
  134. if memb != nil {
  135. log.Panicf("two members with the given name %q exist", name)
  136. }
  137. memb = m
  138. }
  139. }
  140. return memb.Clone()
  141. }
  142. func (c *Cluster) MemberIDs() []types.ID {
  143. c.Lock()
  144. defer c.Unlock()
  145. var ids []types.ID
  146. for _, m := range c.members {
  147. ids = append(ids, m.ID)
  148. }
  149. sort.Sort(types.IDSlice(ids))
  150. return ids
  151. }
  152. func (c *Cluster) IsIDRemoved(id types.ID) bool {
  153. c.Lock()
  154. defer c.Unlock()
  155. return c.removed[id]
  156. }
  157. // PeerURLs returns a list of all peer addresses.
  158. // The returned list is sorted in ascending lexicographical order.
  159. func (c *Cluster) PeerURLs() []string {
  160. c.Lock()
  161. defer c.Unlock()
  162. urls := make([]string, 0)
  163. for _, p := range c.members {
  164. for _, addr := range p.PeerURLs {
  165. urls = append(urls, addr)
  166. }
  167. }
  168. sort.Strings(urls)
  169. return urls
  170. }
  171. // ClientURLs returns a list of all client addresses.
  172. // The returned list is sorted in ascending lexicographical order.
  173. func (c *Cluster) ClientURLs() []string {
  174. c.Lock()
  175. defer c.Unlock()
  176. urls := make([]string, 0)
  177. for _, p := range c.members {
  178. for _, url := range p.ClientURLs {
  179. urls = append(urls, url)
  180. }
  181. }
  182. sort.Strings(urls)
  183. return urls
  184. }
  185. func (c *Cluster) String() string {
  186. c.Lock()
  187. defer c.Unlock()
  188. sl := []string{}
  189. for _, m := range c.members {
  190. for _, u := range m.PeerURLs {
  191. sl = append(sl, fmt.Sprintf("%s=%s", m.Name, u))
  192. }
  193. }
  194. sort.Strings(sl)
  195. return strings.Join(sl, ",")
  196. }
  197. func (c *Cluster) genID() {
  198. mIDs := c.MemberIDs()
  199. b := make([]byte, 8*len(mIDs))
  200. for i, id := range mIDs {
  201. binary.BigEndian.PutUint64(b[8*i:], uint64(id))
  202. }
  203. hash := sha1.Sum(b)
  204. c.id = types.ID(binary.BigEndian.Uint64(hash[:8]))
  205. }
  206. func (c *Cluster) SetID(id types.ID) { c.id = id }
  207. func (c *Cluster) SetStore(st store.Store) { c.store = st }
  208. func (c *Cluster) Recover() {
  209. c.members, c.removed = membersFromStore(c.store)
  210. }
  211. // ValidateConfigurationChange takes a proposed ConfChange and
  212. // ensures that it is still valid.
  213. func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
  214. members, removed := membersFromStore(c.store)
  215. id := types.ID(cc.NodeID)
  216. if removed[id] {
  217. return ErrIDRemoved
  218. }
  219. switch cc.Type {
  220. case raftpb.ConfChangeAddNode:
  221. if members[id] != nil {
  222. return ErrIDExists
  223. }
  224. urls := make(map[string]bool)
  225. for _, m := range members {
  226. for _, u := range m.PeerURLs {
  227. urls[u] = true
  228. }
  229. }
  230. m := new(Member)
  231. if err := json.Unmarshal(cc.Context, m); err != nil {
  232. log.Panicf("unmarshal member should never fail: %v", err)
  233. }
  234. for _, u := range m.PeerURLs {
  235. if urls[u] {
  236. return ErrPeerURLexists
  237. }
  238. }
  239. case raftpb.ConfChangeRemoveNode:
  240. if members[id] == nil {
  241. return ErrIDNotFound
  242. }
  243. case raftpb.ConfChangeUpdateNode:
  244. if members[id] == nil {
  245. return ErrIDNotFound
  246. }
  247. urls := make(map[string]bool)
  248. for _, m := range members {
  249. if m.ID == id {
  250. continue
  251. }
  252. for _, u := range m.PeerURLs {
  253. urls[u] = true
  254. }
  255. }
  256. m := new(Member)
  257. if err := json.Unmarshal(cc.Context, m); err != nil {
  258. log.Panicf("unmarshal member should never fail: %v", err)
  259. }
  260. for _, u := range m.PeerURLs {
  261. if urls[u] {
  262. return ErrPeerURLexists
  263. }
  264. }
  265. default:
  266. log.Panicf("ConfChange type should be either AddNode, RemoveNode or UpdateNode")
  267. }
  268. return nil
  269. }
  270. // AddMember adds a new Member into the cluster, and saves the given member's
  271. // raftAttributes into the store. The given member should have empty attributes.
  272. // A Member with a matching id must not exist.
  273. func (c *Cluster) AddMember(m *Member) {
  274. c.Lock()
  275. defer c.Unlock()
  276. b, err := json.Marshal(m.RaftAttributes)
  277. if err != nil {
  278. log.Panicf("marshal raftAttributes should never fail: %v", err)
  279. }
  280. p := path.Join(memberStoreKey(m.ID), raftAttributesSuffix)
  281. if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil {
  282. log.Panicf("create raftAttributes should never fail: %v", err)
  283. }
  284. c.members[m.ID] = m
  285. }
  286. // RemoveMember removes a member from the store.
  287. // The given id MUST exist, or the function panics.
  288. func (c *Cluster) RemoveMember(id types.ID) {
  289. c.Lock()
  290. defer c.Unlock()
  291. if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil {
  292. log.Panicf("delete member should never fail: %v", err)
  293. }
  294. delete(c.members, id)
  295. if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil {
  296. log.Panicf("create removedMember should never fail: %v", err)
  297. }
  298. c.removed[id] = true
  299. }
  300. func (c *Cluster) UpdateAttributes(id types.ID, attr Attributes) {
  301. c.Lock()
  302. defer c.Unlock()
  303. c.members[id].Attributes = attr
  304. // TODO: update store in this function
  305. }
  306. func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
  307. c.Lock()
  308. defer c.Unlock()
  309. b, err := json.Marshal(raftAttr)
  310. if err != nil {
  311. log.Panicf("marshal raftAttributes should never fail: %v", err)
  312. }
  313. p := path.Join(memberStoreKey(id), raftAttributesSuffix)
  314. if _, err := c.store.Update(p, string(b), store.Permanent); err != nil {
  315. log.Panicf("update raftAttributes should never fail: %v", err)
  316. }
  317. c.members[id].RaftAttributes = raftAttr
  318. }
  319. // Validate ensures that there is no identical urls in the cluster peer list
  320. func (c *Cluster) Validate() error {
  321. urlMap := make(map[string]bool)
  322. for _, m := range c.Members() {
  323. for _, url := range m.PeerURLs {
  324. if urlMap[url] {
  325. return fmt.Errorf("duplicate url %v in cluster config", url)
  326. }
  327. urlMap[url] = true
  328. }
  329. }
  330. return nil
  331. }
  332. func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) {
  333. members := make(map[types.ID]*Member)
  334. removed := make(map[types.ID]bool)
  335. e, err := st.Get(storeMembersPrefix, true, true)
  336. if err != nil {
  337. if isKeyNotFound(err) {
  338. return members, removed
  339. }
  340. log.Panicf("get storeMembers should never fail: %v", err)
  341. }
  342. for _, n := range e.Node.Nodes {
  343. m, err := nodeToMember(n)
  344. if err != nil {
  345. log.Panicf("nodeToMember should never fail: %v", err)
  346. }
  347. members[m.ID] = m
  348. }
  349. e, err = st.Get(storeRemovedMembersPrefix, true, true)
  350. if err != nil {
  351. if isKeyNotFound(err) {
  352. return members, removed
  353. }
  354. log.Panicf("get storeRemovedMembers should never fail: %v", err)
  355. }
  356. for _, n := range e.Node.Nodes {
  357. removed[mustParseMemberIDFromKey(n.Key)] = true
  358. }
  359. return members, removed
  360. }
  361. // ValidateClusterAndAssignIDs validates the local cluster by matching the PeerURLs
  362. // with the existing cluster. If the validation succeeds, it assigns the IDs
  363. // from the existing cluster to the local cluster.
  364. // If the validation fails, an error will be returned.
  365. func ValidateClusterAndAssignIDs(local *Cluster, existing *Cluster) error {
  366. ems := existing.Members()
  367. lms := local.Members()
  368. if len(ems) != len(lms) {
  369. return fmt.Errorf("member count is unequal")
  370. }
  371. sort.Sort(SortableMemberSliceByPeerURLs(ems))
  372. sort.Sort(SortableMemberSliceByPeerURLs(lms))
  373. for i := range ems {
  374. // TODO: Remove URLStringsEqual after improvement of using hostnames #2150 #2123
  375. if !netutil.URLStringsEqual(ems[i].PeerURLs, lms[i].PeerURLs) {
  376. return fmt.Errorf("unmatched member while checking PeerURLs")
  377. }
  378. lms[i].ID = ems[i].ID
  379. }
  380. local.members = make(map[types.ID]*Member)
  381. for _, m := range lms {
  382. local.members[m.ID] = m
  383. }
  384. return nil
  385. }