cluster.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdserver
  15. import (
  16. "crypto/sha1"
  17. "encoding/binary"
  18. "encoding/json"
  19. "fmt"
  20. "log"
  21. "net/url"
  22. "path"
  23. "sort"
  24. "strings"
  25. "sync"
  26. "github.com/coreos/etcd/pkg/flags"
  27. "github.com/coreos/etcd/pkg/netutil"
  28. "github.com/coreos/etcd/pkg/types"
  29. "github.com/coreos/etcd/raft/raftpb"
  30. "github.com/coreos/etcd/rafthttp"
  31. "github.com/coreos/etcd/store"
  32. )
  33. const (
  34. raftAttributesSuffix = "raftAttributes"
  35. attributesSuffix = "attributes"
  36. )
  37. type ClusterInfo interface {
  38. // ID returns the cluster ID
  39. ID() types.ID
  40. // ClientURLs returns an aggregate set of all URLs on which this
  41. // cluster is listening for client requests
  42. ClientURLs() []string
  43. // Members returns a slice of members sorted by their ID
  44. Members() []*Member
  45. // Member retrieves a particular member based on ID, or nil if the
  46. // member does not exist in the cluster
  47. Member(id types.ID) *Member
  48. // IsIDRemoved checks whether the given ID has been removed from this
  49. // cluster at some point in the past
  50. IsIDRemoved(id types.ID) bool
  51. }
  52. // Cluster is a list of Members that belong to the same raft cluster
  53. type Cluster struct {
  54. id types.ID
  55. token string
  56. store store.Store
  57. // index is the raft index that cluster is updated at bootstrap
  58. // from remote cluster info.
  59. // It may have a higher value than local raft index, because it
  60. // displays a further view of the cluster.
  61. // TODO: upgrade it as last modified index
  62. index uint64
  63. // transport and members maintains the view of the cluster at index.
  64. // This might be more up to date than what stores in the store since
  65. // the index may be higher than store index, which may happen when the
  66. // cluster is updated from remote cluster info.
  67. transport rafthttp.Transporter
  68. sync.Mutex // guards members and removed map
  69. members map[types.ID]*Member
  70. // removed contains the ids of removed members in the cluster.
  71. // removed id cannot be reused.
  72. removed map[types.ID]bool
  73. }
  74. // NewClusterFromString returns a Cluster instantiated from the given cluster token
  75. // and cluster string, by parsing members from a set of discovery-formatted
  76. // names-to-IPs, like:
  77. // mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach1=http://3.3.3.3,mach2=http://4.4.4.4
  78. func NewClusterFromString(token string, cluster string) (*Cluster, error) {
  79. c := newCluster(token)
  80. v, err := url.ParseQuery(strings.Replace(cluster, ",", "&", -1))
  81. if err != nil {
  82. return nil, err
  83. }
  84. for name, urls := range v {
  85. if len(urls) == 0 || urls[0] == "" {
  86. return nil, fmt.Errorf("Empty URL given for %q", name)
  87. }
  88. purls := &flags.URLsValue{}
  89. if err := purls.Set(strings.Join(urls, ",")); err != nil {
  90. return nil, err
  91. }
  92. m := NewMember(name, types.URLs(*purls), c.token, nil)
  93. if _, ok := c.members[m.ID]; ok {
  94. return nil, fmt.Errorf("Member exists with identical ID %v", m)
  95. }
  96. c.members[m.ID] = m
  97. }
  98. c.genID()
  99. return c, nil
  100. }
  101. func NewClusterFromStore(token string, st store.Store) *Cluster {
  102. c := newCluster(token)
  103. c.store = st
  104. c.members, c.removed = membersFromStore(c.store)
  105. return c
  106. }
  107. func NewClusterFromMembers(token string, id types.ID, membs []*Member) *Cluster {
  108. c := newCluster(token)
  109. c.id = id
  110. for _, m := range membs {
  111. c.members[m.ID] = m
  112. }
  113. return c
  114. }
  115. func newCluster(token string) *Cluster {
  116. return &Cluster{
  117. token: token,
  118. members: make(map[types.ID]*Member),
  119. removed: make(map[types.ID]bool),
  120. }
  121. }
  122. func (c *Cluster) ID() types.ID { return c.id }
  123. func (c *Cluster) Members() []*Member {
  124. c.Lock()
  125. defer c.Unlock()
  126. var sms SortableMemberSlice
  127. for _, m := range c.members {
  128. sms = append(sms, m.Clone())
  129. }
  130. sort.Sort(sms)
  131. return []*Member(sms)
  132. }
  133. func (c *Cluster) Member(id types.ID) *Member {
  134. c.Lock()
  135. defer c.Unlock()
  136. return c.members[id].Clone()
  137. }
  138. // MemberByName returns a Member with the given name if exists.
  139. // If more than one member has the given name, it will panic.
  140. func (c *Cluster) MemberByName(name string) *Member {
  141. c.Lock()
  142. defer c.Unlock()
  143. var memb *Member
  144. for _, m := range c.members {
  145. if m.Name == name {
  146. if memb != nil {
  147. log.Panicf("two members with the given name %q exist", name)
  148. }
  149. memb = m
  150. }
  151. }
  152. return memb.Clone()
  153. }
  154. func (c *Cluster) MemberIDs() []types.ID {
  155. c.Lock()
  156. defer c.Unlock()
  157. var ids []types.ID
  158. for _, m := range c.members {
  159. ids = append(ids, m.ID)
  160. }
  161. sort.Sort(types.IDSlice(ids))
  162. return ids
  163. }
  164. func (c *Cluster) IsIDRemoved(id types.ID) bool {
  165. c.Lock()
  166. defer c.Unlock()
  167. return c.removed[id]
  168. }
  169. // PeerURLs returns a list of all peer addresses.
  170. // The returned list is sorted in ascending lexicographical order.
  171. func (c *Cluster) PeerURLs() []string {
  172. c.Lock()
  173. defer c.Unlock()
  174. urls := make([]string, 0)
  175. for _, p := range c.members {
  176. for _, addr := range p.PeerURLs {
  177. urls = append(urls, addr)
  178. }
  179. }
  180. sort.Strings(urls)
  181. return urls
  182. }
  183. // ClientURLs returns a list of all client addresses.
  184. // The returned list is sorted in ascending lexicographical order.
  185. func (c *Cluster) ClientURLs() []string {
  186. c.Lock()
  187. defer c.Unlock()
  188. urls := make([]string, 0)
  189. for _, p := range c.members {
  190. for _, url := range p.ClientURLs {
  191. urls = append(urls, url)
  192. }
  193. }
  194. sort.Strings(urls)
  195. return urls
  196. }
  197. func (c *Cluster) String() string {
  198. c.Lock()
  199. defer c.Unlock()
  200. sl := []string{}
  201. for _, m := range c.members {
  202. for _, u := range m.PeerURLs {
  203. sl = append(sl, fmt.Sprintf("%s=%s", m.Name, u))
  204. }
  205. }
  206. sort.Strings(sl)
  207. return strings.Join(sl, ",")
  208. }
  209. func (c *Cluster) genID() {
  210. mIDs := c.MemberIDs()
  211. b := make([]byte, 8*len(mIDs))
  212. for i, id := range mIDs {
  213. binary.BigEndian.PutUint64(b[8*i:], uint64(id))
  214. }
  215. hash := sha1.Sum(b)
  216. c.id = types.ID(binary.BigEndian.Uint64(hash[:8]))
  217. }
  218. func (c *Cluster) SetID(id types.ID) { c.id = id }
  219. func (c *Cluster) SetStore(st store.Store) { c.store = st }
  220. func (c *Cluster) UpdateIndex(index uint64) { c.index = index }
  221. func (c *Cluster) Recover() {
  222. c.members, c.removed = membersFromStore(c.store)
  223. // recover transport
  224. c.transport.RemoveAllPeers()
  225. for _, m := range c.Members() {
  226. c.transport.AddPeer(m.ID, m.PeerURLs)
  227. }
  228. }
  229. func (c *Cluster) SetTransport(tr rafthttp.Transporter) {
  230. c.transport = tr
  231. // add all the remote members into transport
  232. for _, m := range c.Members() {
  233. c.transport.AddPeer(m.ID, m.PeerURLs)
  234. }
  235. }
  236. // ValidateConfigurationChange takes a proposed ConfChange and
  237. // ensures that it is still valid.
  238. func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
  239. members, removed := membersFromStore(c.store)
  240. id := types.ID(cc.NodeID)
  241. if removed[id] {
  242. return ErrIDRemoved
  243. }
  244. switch cc.Type {
  245. case raftpb.ConfChangeAddNode:
  246. if members[id] != nil {
  247. return ErrIDExists
  248. }
  249. urls := make(map[string]bool)
  250. for _, m := range members {
  251. for _, u := range m.PeerURLs {
  252. urls[u] = true
  253. }
  254. }
  255. m := new(Member)
  256. if err := json.Unmarshal(cc.Context, m); err != nil {
  257. log.Panicf("unmarshal member should never fail: %v", err)
  258. }
  259. for _, u := range m.PeerURLs {
  260. if urls[u] {
  261. return ErrPeerURLexists
  262. }
  263. }
  264. case raftpb.ConfChangeRemoveNode:
  265. if members[id] == nil {
  266. return ErrIDNotFound
  267. }
  268. case raftpb.ConfChangeUpdateNode:
  269. if members[id] == nil {
  270. return ErrIDNotFound
  271. }
  272. urls := make(map[string]bool)
  273. for _, m := range members {
  274. if m.ID == id {
  275. continue
  276. }
  277. for _, u := range m.PeerURLs {
  278. urls[u] = true
  279. }
  280. }
  281. m := new(Member)
  282. if err := json.Unmarshal(cc.Context, m); err != nil {
  283. log.Panicf("unmarshal member should never fail: %v", err)
  284. }
  285. for _, u := range m.PeerURLs {
  286. if urls[u] {
  287. return ErrPeerURLexists
  288. }
  289. }
  290. default:
  291. log.Panicf("ConfChange type should be either AddNode, RemoveNode or UpdateNode")
  292. }
  293. return nil
  294. }
  295. // AddMember adds a new Member into the cluster, and saves the given member's
  296. // raftAttributes into the store. The given member should have empty attributes.
  297. // A Member with a matching id must not exist.
  298. // The given index indicates when the event happens.
  299. func (c *Cluster) AddMember(m *Member, index uint64) {
  300. c.Lock()
  301. defer c.Unlock()
  302. b, err := json.Marshal(m.RaftAttributes)
  303. if err != nil {
  304. log.Panicf("marshal raftAttributes should never fail: %v", err)
  305. }
  306. p := path.Join(memberStoreKey(m.ID), raftAttributesSuffix)
  307. if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil {
  308. log.Panicf("create raftAttributes should never fail: %v", err)
  309. }
  310. if index > c.index {
  311. // TODO: check member does not exist in the cluster
  312. // New bootstrapped member has initial cluster, which contains unadded
  313. // peers.
  314. c.members[m.ID] = m
  315. c.transport.AddPeer(m.ID, m.PeerURLs)
  316. c.index = index
  317. }
  318. }
  319. // RemoveMember removes a member from the store.
  320. // The given id MUST exist, or the function panics.
  321. // The given index indicates when the event happens.
  322. func (c *Cluster) RemoveMember(id types.ID, index uint64) {
  323. c.Lock()
  324. defer c.Unlock()
  325. if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil {
  326. log.Panicf("delete member should never fail: %v", err)
  327. }
  328. if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil {
  329. log.Panicf("create removedMember should never fail: %v", err)
  330. }
  331. if index > c.index {
  332. if _, ok := c.members[id]; !ok {
  333. log.Panicf("member %s should exist in the cluster", id)
  334. }
  335. delete(c.members, id)
  336. c.removed[id] = true
  337. c.transport.RemovePeer(id)
  338. c.index = index
  339. }
  340. }
  341. func (c *Cluster) UpdateAttributes(id types.ID, attr Attributes) {
  342. c.Lock()
  343. defer c.Unlock()
  344. c.members[id].Attributes = attr
  345. // TODO: update store in this function
  346. }
  347. // UpdateRaftAttributes updates the raft attributes of the given id.
  348. // The given index indicates when the event happens.
  349. func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, index uint64) {
  350. c.Lock()
  351. defer c.Unlock()
  352. b, err := json.Marshal(raftAttr)
  353. if err != nil {
  354. log.Panicf("marshal raftAttributes should never fail: %v", err)
  355. }
  356. p := path.Join(memberStoreKey(id), raftAttributesSuffix)
  357. if _, err := c.store.Update(p, string(b), store.Permanent); err != nil {
  358. log.Panicf("update raftAttributes should never fail: %v", err)
  359. }
  360. if index > c.index {
  361. c.members[id].RaftAttributes = raftAttr
  362. c.transport.UpdatePeer(id, raftAttr.PeerURLs)
  363. c.index = index
  364. }
  365. }
  366. // Validate ensures that there is no identical urls in the cluster peer list
  367. func (c *Cluster) Validate() error {
  368. urlMap := make(map[string]bool)
  369. for _, m := range c.Members() {
  370. for _, url := range m.PeerURLs {
  371. if urlMap[url] {
  372. return fmt.Errorf("duplicate url %v in cluster config", url)
  373. }
  374. urlMap[url] = true
  375. }
  376. }
  377. return nil
  378. }
  379. func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) {
  380. members := make(map[types.ID]*Member)
  381. removed := make(map[types.ID]bool)
  382. e, err := st.Get(storeMembersPrefix, true, true)
  383. if err != nil {
  384. if isKeyNotFound(err) {
  385. return members, removed
  386. }
  387. log.Panicf("get storeMembers should never fail: %v", err)
  388. }
  389. for _, n := range e.Node.Nodes {
  390. m, err := nodeToMember(n)
  391. if err != nil {
  392. log.Panicf("nodeToMember should never fail: %v", err)
  393. }
  394. members[m.ID] = m
  395. }
  396. e, err = st.Get(storeRemovedMembersPrefix, true, true)
  397. if err != nil {
  398. if isKeyNotFound(err) {
  399. return members, removed
  400. }
  401. log.Panicf("get storeRemovedMembers should never fail: %v", err)
  402. }
  403. for _, n := range e.Node.Nodes {
  404. removed[mustParseMemberIDFromKey(n.Key)] = true
  405. }
  406. return members, removed
  407. }
  408. // ValidateClusterAndAssignIDs validates the local cluster by matching the PeerURLs
  409. // with the existing cluster. If the validation succeeds, it assigns the IDs
  410. // from the existing cluster to the local cluster.
  411. // If the validation fails, an error will be returned.
  412. func ValidateClusterAndAssignIDs(local *Cluster, existing *Cluster) error {
  413. ems := existing.Members()
  414. lms := local.Members()
  415. if len(ems) != len(lms) {
  416. return fmt.Errorf("member count is unequal")
  417. }
  418. sort.Sort(SortableMemberSliceByPeerURLs(ems))
  419. sort.Sort(SortableMemberSliceByPeerURLs(lms))
  420. for i := range ems {
  421. // TODO: Remove URLStringsEqual after improvement of using hostnames #2150 #2123
  422. if !netutil.URLStringsEqual(ems[i].PeerURLs, lms[i].PeerURLs) {
  423. return fmt.Errorf("unmatched member while checking PeerURLs")
  424. }
  425. lms[i].ID = ems[i].ID
  426. }
  427. local.members = make(map[types.ID]*Member)
  428. for _, m := range lms {
  429. local.members[m.ID] = m
  430. }
  431. return nil
  432. }