cluster.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdserver
  15. import (
  16. "crypto/sha1"
  17. "encoding/binary"
  18. "encoding/json"
  19. "fmt"
  20. "log"
  21. "net/url"
  22. "path"
  23. "sort"
  24. "strings"
  25. "sync"
  26. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
  27. "github.com/coreos/etcd/pkg/flags"
  28. "github.com/coreos/etcd/pkg/netutil"
  29. "github.com/coreos/etcd/pkg/types"
  30. "github.com/coreos/etcd/raft/raftpb"
  31. "github.com/coreos/etcd/store"
  32. )
  33. const (
  34. raftAttributesSuffix = "raftAttributes"
  35. attributesSuffix = "attributes"
  36. )
  37. type ClusterInfo interface {
  38. // ID returns the cluster ID
  39. ID() types.ID
  40. // ClientURLs returns an aggregate set of all URLs on which this
  41. // cluster is listening for client requests
  42. ClientURLs() []string
  43. // Members returns a slice of members sorted by their ID
  44. Members() []*Member
  45. // Member retrieves a particular member based on ID, or nil if the
  46. // member does not exist in the cluster
  47. Member(id types.ID) *Member
  48. // IsIDRemoved checks whether the given ID has been removed from this
  49. // cluster at some point in the past
  50. IsIDRemoved(id types.ID) bool
  51. }
  52. // Cluster is a list of Members that belong to the same raft cluster
  53. type Cluster struct {
  54. id types.ID
  55. token string
  56. store store.Store
  57. sync.Mutex // guards the fields below
  58. version *semver.Version
  59. members map[types.ID]*Member
  60. // removed contains the ids of removed members in the cluster.
  61. // removed id cannot be reused.
  62. removed map[types.ID]bool
  63. }
  64. // NewClusterFromString returns a Cluster instantiated from the given cluster token
  65. // and cluster string, by parsing members from a set of discovery-formatted
  66. // names-to-IPs, like:
  67. // mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach1=http://3.3.3.3,mach2=http://4.4.4.4
  68. func NewClusterFromString(token string, cluster string) (*Cluster, error) {
  69. c := newCluster(token)
  70. v, err := url.ParseQuery(strings.Replace(cluster, ",", "&", -1))
  71. if err != nil {
  72. return nil, err
  73. }
  74. for name, urls := range v {
  75. if len(urls) == 0 || urls[0] == "" {
  76. return nil, fmt.Errorf("Empty URL given for %q", name)
  77. }
  78. purls := &flags.URLsValue{}
  79. if err := purls.Set(strings.Join(urls, ",")); err != nil {
  80. return nil, err
  81. }
  82. m := NewMember(name, types.URLs(*purls), c.token, nil)
  83. if _, ok := c.members[m.ID]; ok {
  84. return nil, fmt.Errorf("Member exists with identical ID %v", m)
  85. }
  86. c.members[m.ID] = m
  87. }
  88. c.genID()
  89. return c, nil
  90. }
  91. func NewClusterFromStore(token string, st store.Store) *Cluster {
  92. c := newCluster(token)
  93. c.store = st
  94. c.members, c.removed = membersFromStore(c.store)
  95. c.version = clusterVersionFromStore(c.store)
  96. return c
  97. }
  98. func NewClusterFromMembers(token string, id types.ID, membs []*Member) *Cluster {
  99. c := newCluster(token)
  100. c.id = id
  101. for _, m := range membs {
  102. c.members[m.ID] = m
  103. }
  104. return c
  105. }
  106. func newCluster(token string) *Cluster {
  107. return &Cluster{
  108. token: token,
  109. members: make(map[types.ID]*Member),
  110. removed: make(map[types.ID]bool),
  111. }
  112. }
  113. func (c *Cluster) ID() types.ID { return c.id }
  114. func (c *Cluster) Members() []*Member {
  115. c.Lock()
  116. defer c.Unlock()
  117. var sms SortableMemberSlice
  118. for _, m := range c.members {
  119. sms = append(sms, m.Clone())
  120. }
  121. sort.Sort(sms)
  122. return []*Member(sms)
  123. }
  124. func (c *Cluster) Member(id types.ID) *Member {
  125. c.Lock()
  126. defer c.Unlock()
  127. return c.members[id].Clone()
  128. }
  129. // MemberByName returns a Member with the given name if exists.
  130. // If more than one member has the given name, it will panic.
  131. func (c *Cluster) MemberByName(name string) *Member {
  132. c.Lock()
  133. defer c.Unlock()
  134. var memb *Member
  135. for _, m := range c.members {
  136. if m.Name == name {
  137. if memb != nil {
  138. log.Panicf("two members with the given name %q exist", name)
  139. }
  140. memb = m
  141. }
  142. }
  143. return memb.Clone()
  144. }
  145. func (c *Cluster) MemberIDs() []types.ID {
  146. c.Lock()
  147. defer c.Unlock()
  148. var ids []types.ID
  149. for _, m := range c.members {
  150. ids = append(ids, m.ID)
  151. }
  152. sort.Sort(types.IDSlice(ids))
  153. return ids
  154. }
  155. func (c *Cluster) IsIDRemoved(id types.ID) bool {
  156. c.Lock()
  157. defer c.Unlock()
  158. return c.removed[id]
  159. }
  160. // PeerURLs returns a list of all peer addresses.
  161. // The returned list is sorted in ascending lexicographical order.
  162. func (c *Cluster) PeerURLs() []string {
  163. c.Lock()
  164. defer c.Unlock()
  165. urls := make([]string, 0)
  166. for _, p := range c.members {
  167. for _, addr := range p.PeerURLs {
  168. urls = append(urls, addr)
  169. }
  170. }
  171. sort.Strings(urls)
  172. return urls
  173. }
  174. // ClientURLs returns a list of all client addresses.
  175. // The returned list is sorted in ascending lexicographical order.
  176. func (c *Cluster) ClientURLs() []string {
  177. c.Lock()
  178. defer c.Unlock()
  179. urls := make([]string, 0)
  180. for _, p := range c.members {
  181. for _, url := range p.ClientURLs {
  182. urls = append(urls, url)
  183. }
  184. }
  185. sort.Strings(urls)
  186. return urls
  187. }
  188. func (c *Cluster) String() string {
  189. c.Lock()
  190. defer c.Unlock()
  191. sl := []string{}
  192. for _, m := range c.members {
  193. for _, u := range m.PeerURLs {
  194. sl = append(sl, fmt.Sprintf("%s=%s", m.Name, u))
  195. }
  196. }
  197. sort.Strings(sl)
  198. return strings.Join(sl, ",")
  199. }
  200. func (c *Cluster) genID() {
  201. mIDs := c.MemberIDs()
  202. b := make([]byte, 8*len(mIDs))
  203. for i, id := range mIDs {
  204. binary.BigEndian.PutUint64(b[8*i:], uint64(id))
  205. }
  206. hash := sha1.Sum(b)
  207. c.id = types.ID(binary.BigEndian.Uint64(hash[:8]))
  208. }
  209. func (c *Cluster) SetID(id types.ID) { c.id = id }
  210. func (c *Cluster) SetStore(st store.Store) { c.store = st }
  211. func (c *Cluster) Recover() {
  212. c.members, c.removed = membersFromStore(c.store)
  213. c.version = clusterVersionFromStore(c.store)
  214. }
  215. // ValidateConfigurationChange takes a proposed ConfChange and
  216. // ensures that it is still valid.
  217. func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
  218. members, removed := membersFromStore(c.store)
  219. id := types.ID(cc.NodeID)
  220. if removed[id] {
  221. return ErrIDRemoved
  222. }
  223. switch cc.Type {
  224. case raftpb.ConfChangeAddNode:
  225. if members[id] != nil {
  226. return ErrIDExists
  227. }
  228. urls := make(map[string]bool)
  229. for _, m := range members {
  230. for _, u := range m.PeerURLs {
  231. urls[u] = true
  232. }
  233. }
  234. m := new(Member)
  235. if err := json.Unmarshal(cc.Context, m); err != nil {
  236. log.Panicf("unmarshal member should never fail: %v", err)
  237. }
  238. for _, u := range m.PeerURLs {
  239. if urls[u] {
  240. return ErrPeerURLexists
  241. }
  242. }
  243. case raftpb.ConfChangeRemoveNode:
  244. if members[id] == nil {
  245. return ErrIDNotFound
  246. }
  247. case raftpb.ConfChangeUpdateNode:
  248. if members[id] == nil {
  249. return ErrIDNotFound
  250. }
  251. urls := make(map[string]bool)
  252. for _, m := range members {
  253. if m.ID == id {
  254. continue
  255. }
  256. for _, u := range m.PeerURLs {
  257. urls[u] = true
  258. }
  259. }
  260. m := new(Member)
  261. if err := json.Unmarshal(cc.Context, m); err != nil {
  262. log.Panicf("unmarshal member should never fail: %v", err)
  263. }
  264. for _, u := range m.PeerURLs {
  265. if urls[u] {
  266. return ErrPeerURLexists
  267. }
  268. }
  269. default:
  270. log.Panicf("ConfChange type should be either AddNode, RemoveNode or UpdateNode")
  271. }
  272. return nil
  273. }
  274. // AddMember adds a new Member into the cluster, and saves the given member's
  275. // raftAttributes into the store. The given member should have empty attributes.
  276. // A Member with a matching id must not exist.
  277. func (c *Cluster) AddMember(m *Member) {
  278. c.Lock()
  279. defer c.Unlock()
  280. b, err := json.Marshal(m.RaftAttributes)
  281. if err != nil {
  282. log.Panicf("marshal raftAttributes should never fail: %v", err)
  283. }
  284. p := path.Join(memberStoreKey(m.ID), raftAttributesSuffix)
  285. if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil {
  286. log.Panicf("create raftAttributes should never fail: %v", err)
  287. }
  288. c.members[m.ID] = m
  289. }
  290. // RemoveMember removes a member from the store.
  291. // The given id MUST exist, or the function panics.
  292. func (c *Cluster) RemoveMember(id types.ID) {
  293. c.Lock()
  294. defer c.Unlock()
  295. if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil {
  296. log.Panicf("delete member should never fail: %v", err)
  297. }
  298. delete(c.members, id)
  299. if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil {
  300. log.Panicf("create removedMember should never fail: %v", err)
  301. }
  302. c.removed[id] = true
  303. }
  304. func (c *Cluster) UpdateAttributes(id types.ID, attr Attributes) {
  305. c.Lock()
  306. defer c.Unlock()
  307. c.members[id].Attributes = attr
  308. // TODO: update store in this function
  309. }
  310. func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
  311. c.Lock()
  312. defer c.Unlock()
  313. b, err := json.Marshal(raftAttr)
  314. if err != nil {
  315. log.Panicf("marshal raftAttributes should never fail: %v", err)
  316. }
  317. p := path.Join(memberStoreKey(id), raftAttributesSuffix)
  318. if _, err := c.store.Update(p, string(b), store.Permanent); err != nil {
  319. log.Panicf("update raftAttributes should never fail: %v", err)
  320. }
  321. c.members[id].RaftAttributes = raftAttr
  322. }
  323. func (c *Cluster) Version() *semver.Version {
  324. c.Lock()
  325. defer c.Unlock()
  326. if c.version == nil {
  327. return nil
  328. }
  329. return semver.Must(semver.NewVersion(c.version.String()))
  330. }
  331. func (c *Cluster) SetVersion(ver *semver.Version) {
  332. c.Lock()
  333. defer c.Unlock()
  334. if c.version != nil {
  335. log.Printf("etcdsever: updated the cluster version from %v to %v", c.version.String(), ver.String())
  336. } else {
  337. log.Printf("etcdsever: set the initial cluster version to %v", ver.String())
  338. }
  339. c.version = ver
  340. }
  341. // Validate ensures that there is no identical urls in the cluster peer list
  342. func (c *Cluster) Validate() error {
  343. urlMap := make(map[string]bool)
  344. for _, m := range c.Members() {
  345. for _, url := range m.PeerURLs {
  346. if urlMap[url] {
  347. return fmt.Errorf("duplicate url %v in cluster config", url)
  348. }
  349. urlMap[url] = true
  350. }
  351. }
  352. return nil
  353. }
  354. func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) {
  355. members := make(map[types.ID]*Member)
  356. removed := make(map[types.ID]bool)
  357. e, err := st.Get(storeMembersPrefix, true, true)
  358. if err != nil {
  359. if isKeyNotFound(err) {
  360. return members, removed
  361. }
  362. log.Panicf("get storeMembers should never fail: %v", err)
  363. }
  364. for _, n := range e.Node.Nodes {
  365. m, err := nodeToMember(n)
  366. if err != nil {
  367. log.Panicf("nodeToMember should never fail: %v", err)
  368. }
  369. members[m.ID] = m
  370. }
  371. e, err = st.Get(storeRemovedMembersPrefix, true, true)
  372. if err != nil {
  373. if isKeyNotFound(err) {
  374. return members, removed
  375. }
  376. log.Panicf("get storeRemovedMembers should never fail: %v", err)
  377. }
  378. for _, n := range e.Node.Nodes {
  379. removed[mustParseMemberIDFromKey(n.Key)] = true
  380. }
  381. return members, removed
  382. }
  383. func clusterVersionFromStore(st store.Store) *semver.Version {
  384. e, err := st.Get(path.Join(StoreClusterPrefix, "version"), false, false)
  385. if err != nil {
  386. if isKeyNotFound(err) {
  387. return nil
  388. }
  389. log.Panicf("etcdserver: unexpected error (%v) when getting cluster version from store", err)
  390. }
  391. return semver.Must(semver.NewVersion(*e.Node.Value))
  392. }
  393. // ValidateClusterAndAssignIDs validates the local cluster by matching the PeerURLs
  394. // with the existing cluster. If the validation succeeds, it assigns the IDs
  395. // from the existing cluster to the local cluster.
  396. // If the validation fails, an error will be returned.
  397. func ValidateClusterAndAssignIDs(local *Cluster, existing *Cluster) error {
  398. ems := existing.Members()
  399. lms := local.Members()
  400. if len(ems) != len(lms) {
  401. return fmt.Errorf("member count is unequal")
  402. }
  403. sort.Sort(SortableMemberSliceByPeerURLs(ems))
  404. sort.Sort(SortableMemberSliceByPeerURLs(lms))
  405. for i := range ems {
  406. // TODO: Remove URLStringsEqual after improvement of using hostnames #2150 #2123
  407. if !netutil.URLStringsEqual(ems[i].PeerURLs, lms[i].PeerURLs) {
  408. return fmt.Errorf("unmatched member while checking PeerURLs")
  409. }
  410. lms[i].ID = ems[i].ID
  411. }
  412. local.members = make(map[types.ID]*Member)
  413. for _, m := range lms {
  414. local.members[m.ID] = m
  415. }
  416. return nil
  417. }