cluster.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdserver
  15. import (
  16. "crypto/sha1"
  17. "encoding/binary"
  18. "encoding/json"
  19. "fmt"
  20. "log"
  21. "net/url"
  22. "path"
  23. "sort"
  24. "strings"
  25. "sync"
  26. "github.com/coreos/etcd/pkg/flags"
  27. "github.com/coreos/etcd/pkg/netutil"
  28. "github.com/coreos/etcd/pkg/types"
  29. "github.com/coreos/etcd/raft/raftpb"
  30. "github.com/coreos/etcd/store"
  31. )
  32. const (
  33. raftAttributesSuffix = "raftAttributes"
  34. attributesSuffix = "attributes"
  35. )
  36. type ClusterInfo interface {
  37. // ID returns the cluster ID
  38. ID() types.ID
  39. // ClientURLs returns an aggregate set of all URLs on which this
  40. // cluster is listening for client requests
  41. ClientURLs() []string
  42. // Members returns a slice of members sorted by their ID
  43. Members() []*Member
  44. // Member retrieves a particular member based on ID, or nil if the
  45. // member does not exist in the cluster
  46. Member(id types.ID) *Member
  47. // IsIDRemoved checks whether the given ID has been removed from this
  48. // cluster at some point in the past
  49. IsIDRemoved(id types.ID) bool
  50. }
  51. // Cluster is a list of Members that belong to the same raft cluster
  52. type Cluster struct {
  53. id types.ID
  54. token string
  55. store store.Store
  56. // index is the raft index that cluster is updated at bootstrap
  57. // from remote cluster info.
  58. // It may have a higher value than local raft index, because it
  59. // displays a further view of the cluster.
  60. // TODO: upgrade it as last modified index
  61. index uint64
  62. sync.Mutex // guards members and removed map
  63. members map[types.ID]*Member
  64. // removed contains the ids of removed members in the cluster.
  65. // removed id cannot be reused.
  66. removed map[types.ID]bool
  67. }
  68. // NewClusterFromString returns a Cluster instantiated from the given cluster token
  69. // and cluster string, by parsing members from a set of discovery-formatted
  70. // names-to-IPs, like:
  71. // mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach1=http://3.3.3.3,mach2=http://4.4.4.4
  72. func NewClusterFromString(token string, cluster string) (*Cluster, error) {
  73. c := newCluster(token)
  74. v, err := url.ParseQuery(strings.Replace(cluster, ",", "&", -1))
  75. if err != nil {
  76. return nil, err
  77. }
  78. for name, urls := range v {
  79. if len(urls) == 0 || urls[0] == "" {
  80. return nil, fmt.Errorf("Empty URL given for %q", name)
  81. }
  82. purls := &flags.URLsValue{}
  83. if err := purls.Set(strings.Join(urls, ",")); err != nil {
  84. return nil, err
  85. }
  86. m := NewMember(name, types.URLs(*purls), c.token, nil)
  87. if _, ok := c.members[m.ID]; ok {
  88. return nil, fmt.Errorf("Member exists with identical ID %v", m)
  89. }
  90. c.members[m.ID] = m
  91. }
  92. c.genID()
  93. return c, nil
  94. }
  95. func NewClusterFromStore(token string, st store.Store) *Cluster {
  96. c := newCluster(token)
  97. c.store = st
  98. c.members, c.removed = membersFromStore(c.store)
  99. return c
  100. }
  101. func NewClusterFromMembers(token string, id types.ID, membs []*Member) *Cluster {
  102. c := newCluster(token)
  103. c.id = id
  104. for _, m := range membs {
  105. c.members[m.ID] = m
  106. }
  107. return c
  108. }
  109. func newCluster(token string) *Cluster {
  110. return &Cluster{
  111. token: token,
  112. members: make(map[types.ID]*Member),
  113. removed: make(map[types.ID]bool),
  114. }
  115. }
  116. func (c *Cluster) ID() types.ID { return c.id }
  117. func (c *Cluster) Members() []*Member {
  118. c.Lock()
  119. defer c.Unlock()
  120. var sms SortableMemberSlice
  121. for _, m := range c.members {
  122. sms = append(sms, m.Clone())
  123. }
  124. sort.Sort(sms)
  125. return []*Member(sms)
  126. }
  127. func (c *Cluster) Member(id types.ID) *Member {
  128. c.Lock()
  129. defer c.Unlock()
  130. return c.members[id].Clone()
  131. }
  132. // MemberByName returns a Member with the given name if exists.
  133. // If more than one member has the given name, it will panic.
  134. func (c *Cluster) MemberByName(name string) *Member {
  135. c.Lock()
  136. defer c.Unlock()
  137. var memb *Member
  138. for _, m := range c.members {
  139. if m.Name == name {
  140. if memb != nil {
  141. log.Panicf("two members with the given name %q exist", name)
  142. }
  143. memb = m
  144. }
  145. }
  146. return memb.Clone()
  147. }
  148. func (c *Cluster) MemberIDs() []types.ID {
  149. c.Lock()
  150. defer c.Unlock()
  151. var ids []types.ID
  152. for _, m := range c.members {
  153. ids = append(ids, m.ID)
  154. }
  155. sort.Sort(types.IDSlice(ids))
  156. return ids
  157. }
  158. func (c *Cluster) IsIDRemoved(id types.ID) bool {
  159. c.Lock()
  160. defer c.Unlock()
  161. return c.removed[id]
  162. }
  163. // PeerURLs returns a list of all peer addresses.
  164. // The returned list is sorted in ascending lexicographical order.
  165. func (c *Cluster) PeerURLs() []string {
  166. c.Lock()
  167. defer c.Unlock()
  168. urls := make([]string, 0)
  169. for _, p := range c.members {
  170. for _, addr := range p.PeerURLs {
  171. urls = append(urls, addr)
  172. }
  173. }
  174. sort.Strings(urls)
  175. return urls
  176. }
  177. // ClientURLs returns a list of all client addresses.
  178. // The returned list is sorted in ascending lexicographical order.
  179. func (c *Cluster) ClientURLs() []string {
  180. c.Lock()
  181. defer c.Unlock()
  182. urls := make([]string, 0)
  183. for _, p := range c.members {
  184. for _, url := range p.ClientURLs {
  185. urls = append(urls, url)
  186. }
  187. }
  188. sort.Strings(urls)
  189. return urls
  190. }
  191. func (c *Cluster) String() string {
  192. c.Lock()
  193. defer c.Unlock()
  194. sl := []string{}
  195. for _, m := range c.members {
  196. for _, u := range m.PeerURLs {
  197. sl = append(sl, fmt.Sprintf("%s=%s", m.Name, u))
  198. }
  199. }
  200. sort.Strings(sl)
  201. return strings.Join(sl, ",")
  202. }
  203. func (c *Cluster) genID() {
  204. mIDs := c.MemberIDs()
  205. b := make([]byte, 8*len(mIDs))
  206. for i, id := range mIDs {
  207. binary.BigEndian.PutUint64(b[8*i:], uint64(id))
  208. }
  209. hash := sha1.Sum(b)
  210. c.id = types.ID(binary.BigEndian.Uint64(hash[:8]))
  211. }
  212. func (c *Cluster) SetID(id types.ID) { c.id = id }
  213. func (c *Cluster) SetStore(st store.Store) { c.store = st }
  214. func (c *Cluster) UpdateIndex(index uint64) { c.index = index }
  215. func (c *Cluster) Recover() {
  216. c.members, c.removed = membersFromStore(c.store)
  217. }
  218. // ValidateConfigurationChange takes a proposed ConfChange and
  219. // ensures that it is still valid.
  220. func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
  221. members, removed := membersFromStore(c.store)
  222. id := types.ID(cc.NodeID)
  223. if removed[id] {
  224. return ErrIDRemoved
  225. }
  226. switch cc.Type {
  227. case raftpb.ConfChangeAddNode:
  228. if members[id] != nil {
  229. return ErrIDExists
  230. }
  231. urls := make(map[string]bool)
  232. for _, m := range members {
  233. for _, u := range m.PeerURLs {
  234. urls[u] = true
  235. }
  236. }
  237. m := new(Member)
  238. if err := json.Unmarshal(cc.Context, m); err != nil {
  239. log.Panicf("unmarshal member should never fail: %v", err)
  240. }
  241. for _, u := range m.PeerURLs {
  242. if urls[u] {
  243. return ErrPeerURLexists
  244. }
  245. }
  246. case raftpb.ConfChangeRemoveNode:
  247. if members[id] == nil {
  248. return ErrIDNotFound
  249. }
  250. case raftpb.ConfChangeUpdateNode:
  251. if members[id] == nil {
  252. return ErrIDNotFound
  253. }
  254. urls := make(map[string]bool)
  255. for _, m := range members {
  256. if m.ID == id {
  257. continue
  258. }
  259. for _, u := range m.PeerURLs {
  260. urls[u] = true
  261. }
  262. }
  263. m := new(Member)
  264. if err := json.Unmarshal(cc.Context, m); err != nil {
  265. log.Panicf("unmarshal member should never fail: %v", err)
  266. }
  267. for _, u := range m.PeerURLs {
  268. if urls[u] {
  269. return ErrPeerURLexists
  270. }
  271. }
  272. default:
  273. log.Panicf("ConfChange type should be either AddNode, RemoveNode or UpdateNode")
  274. }
  275. return nil
  276. }
  277. // AddMember adds a new Member into the cluster, and saves the given member's
  278. // raftAttributes into the store. The given member should have empty attributes.
  279. // A Member with a matching id must not exist.
  280. func (c *Cluster) AddMember(m *Member) {
  281. c.Lock()
  282. defer c.Unlock()
  283. b, err := json.Marshal(m.RaftAttributes)
  284. if err != nil {
  285. log.Panicf("marshal raftAttributes should never fail: %v", err)
  286. }
  287. p := path.Join(memberStoreKey(m.ID), raftAttributesSuffix)
  288. if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil {
  289. log.Panicf("create raftAttributes should never fail: %v", err)
  290. }
  291. c.members[m.ID] = m
  292. }
  293. // RemoveMember removes a member from the store.
  294. // The given id MUST exist, or the function panics.
  295. func (c *Cluster) RemoveMember(id types.ID) {
  296. c.Lock()
  297. defer c.Unlock()
  298. if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil {
  299. log.Panicf("delete member should never fail: %v", err)
  300. }
  301. delete(c.members, id)
  302. if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil {
  303. log.Panicf("create removedMember should never fail: %v", err)
  304. }
  305. c.removed[id] = true
  306. }
  307. func (c *Cluster) UpdateAttributes(id types.ID, attr Attributes) {
  308. c.Lock()
  309. defer c.Unlock()
  310. c.members[id].Attributes = attr
  311. // TODO: update store in this function
  312. }
  313. func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
  314. c.Lock()
  315. defer c.Unlock()
  316. b, err := json.Marshal(raftAttr)
  317. if err != nil {
  318. log.Panicf("marshal raftAttributes should never fail: %v", err)
  319. }
  320. p := path.Join(memberStoreKey(id), raftAttributesSuffix)
  321. if _, err := c.store.Update(p, string(b), store.Permanent); err != nil {
  322. log.Panicf("update raftAttributes should never fail: %v", err)
  323. }
  324. c.members[id].RaftAttributes = raftAttr
  325. }
  326. // Validate ensures that there is no identical urls in the cluster peer list
  327. func (c *Cluster) Validate() error {
  328. urlMap := make(map[string]bool)
  329. for _, m := range c.Members() {
  330. for _, url := range m.PeerURLs {
  331. if urlMap[url] {
  332. return fmt.Errorf("duplicate url %v in cluster config", url)
  333. }
  334. urlMap[url] = true
  335. }
  336. }
  337. return nil
  338. }
  339. func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) {
  340. members := make(map[types.ID]*Member)
  341. removed := make(map[types.ID]bool)
  342. e, err := st.Get(storeMembersPrefix, true, true)
  343. if err != nil {
  344. if isKeyNotFound(err) {
  345. return members, removed
  346. }
  347. log.Panicf("get storeMembers should never fail: %v", err)
  348. }
  349. for _, n := range e.Node.Nodes {
  350. m, err := nodeToMember(n)
  351. if err != nil {
  352. log.Panicf("nodeToMember should never fail: %v", err)
  353. }
  354. members[m.ID] = m
  355. }
  356. e, err = st.Get(storeRemovedMembersPrefix, true, true)
  357. if err != nil {
  358. if isKeyNotFound(err) {
  359. return members, removed
  360. }
  361. log.Panicf("get storeRemovedMembers should never fail: %v", err)
  362. }
  363. for _, n := range e.Node.Nodes {
  364. removed[mustParseMemberIDFromKey(n.Key)] = true
  365. }
  366. return members, removed
  367. }
  368. // ValidateClusterAndAssignIDs validates the local cluster by matching the PeerURLs
  369. // with the existing cluster. If the validation succeeds, it assigns the IDs
  370. // from the existing cluster to the local cluster.
  371. // If the validation fails, an error will be returned.
  372. func ValidateClusterAndAssignIDs(local *Cluster, existing *Cluster) error {
  373. ems := existing.Members()
  374. lms := local.Members()
  375. if len(ems) != len(lms) {
  376. return fmt.Errorf("member count is unequal")
  377. }
  378. sort.Sort(SortableMemberSliceByPeerURLs(ems))
  379. sort.Sort(SortableMemberSliceByPeerURLs(lms))
  380. for i := range ems {
  381. // TODO: Remove URLStringsEqual after improvement of using hostnames #2150 #2123
  382. if !netutil.URLStringsEqual(ems[i].PeerURLs, lms[i].PeerURLs) {
  383. return fmt.Errorf("unmatched member while checking PeerURLs")
  384. }
  385. lms[i].ID = ems[i].ID
  386. }
  387. local.members = make(map[types.ID]*Member)
  388. for _, m := range lms {
  389. local.members[m.ID] = m
  390. }
  391. return nil
  392. }