cluster_store.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package etcdserver
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "net/http"
  8. etcdErr "github.com/coreos/etcd/error"
  9. "github.com/coreos/etcd/raft/raftpb"
  10. "github.com/coreos/etcd/store"
  11. )
  12. const (
  13. raftPrefix = "/raft"
  14. raftAttributesSuffix = "/raftAttributes"
  15. attributesSuffix = "/attributes"
  16. )
  17. type ClusterStore interface {
  18. Add(m Member)
  19. Get() Cluster
  20. Remove(id uint64)
  21. }
  22. type clusterStore struct {
  23. Store store.Store
  24. }
  25. func NewClusterStore(st store.Store, c Cluster) ClusterStore {
  26. cls := &clusterStore{Store: st}
  27. for _, m := range c {
  28. cls.Add(*m)
  29. }
  30. return cls
  31. }
  32. // Add puts a new Member into the store.
  33. // A Member with a matching id must not exist.
  34. func (s *clusterStore) Add(m Member) {
  35. b, err := json.Marshal(m.RaftAttributes)
  36. if err != nil {
  37. log.Panicf("marshal error: %v", err)
  38. }
  39. if _, err := s.Store.Create(m.storeKey()+raftAttributesSuffix, false, string(b), false, store.Permanent); err != nil {
  40. log.Panicf("add raftAttributes should never fail: %v", err)
  41. }
  42. b, err = json.Marshal(m.Attributes)
  43. if err != nil {
  44. log.Panicf("marshal error: %v", err)
  45. }
  46. if _, err := s.Store.Create(m.storeKey()+attributesSuffix, false, string(b), false, store.Permanent); err != nil {
  47. log.Panicf("add attributes should never fail: %v", err)
  48. }
  49. }
  50. // TODO(philips): keep the latest copy without going to the store to avoid the
  51. // lock here.
  52. func (s *clusterStore) Get() Cluster {
  53. c := &Cluster{}
  54. e, err := s.Store.Get(machineKVPrefix, true, true)
  55. if err != nil {
  56. if v, ok := err.(*etcdErr.Error); ok && v.ErrorCode == etcdErr.EcodeKeyNotFound {
  57. return *c
  58. }
  59. log.Panicf("get member should never fail: %v", err)
  60. }
  61. for _, n := range e.Node.Nodes {
  62. m, err := nodeToMember(n)
  63. if err != nil {
  64. log.Panicf("unexpected nodeToMember error: %v", err)
  65. }
  66. if err := c.Add(m); err != nil {
  67. log.Panicf("add member to cluster should never fail: %v", err)
  68. }
  69. }
  70. return *c
  71. }
  72. // nodeToMember builds member through a store node.
  73. // the child nodes of the given node should be sorted by key.
  74. func nodeToMember(n *store.NodeExtern) (Member, error) {
  75. m := Member{ID: parseMemberID(n.Key)}
  76. if len(n.Nodes) != 2 {
  77. return m, fmt.Errorf("len(nodes) = %d, want 2", len(n.Nodes))
  78. }
  79. if w := n.Key + attributesSuffix; n.Nodes[0].Key != w {
  80. return m, fmt.Errorf("key = %v, want %v", n.Nodes[0].Key, w)
  81. }
  82. if err := json.Unmarshal([]byte(*n.Nodes[0].Value), &m.Attributes); err != nil {
  83. return m, fmt.Errorf("unmarshal attributes error: %v", err)
  84. }
  85. if w := n.Key + raftAttributesSuffix; n.Nodes[1].Key != w {
  86. return m, fmt.Errorf("key = %v, want %v", n.Nodes[1].Key, w)
  87. }
  88. if err := json.Unmarshal([]byte(*n.Nodes[1].Value), &m.RaftAttributes); err != nil {
  89. return m, fmt.Errorf("unmarshal raftAttributes error: %v", err)
  90. }
  91. return m, nil
  92. }
  93. // Remove removes a member from the store.
  94. // The given id MUST exist.
  95. func (s *clusterStore) Remove(id uint64) {
  96. p := s.Get().FindID(id).storeKey()
  97. if _, err := s.Store.Delete(p, true, true); err != nil {
  98. log.Panicf("delete peer should never fail: %v", err)
  99. }
  100. }
  101. func Sender(t *http.Transport, cls ClusterStore) func(msgs []raftpb.Message) {
  102. c := &http.Client{Transport: t}
  103. return func(msgs []raftpb.Message) {
  104. for _, m := range msgs {
  105. // TODO: reuse go routines
  106. // limit the number of outgoing connections for the same receiver
  107. go send(c, cls, m)
  108. }
  109. }
  110. }
  111. func send(c *http.Client, cls ClusterStore, m raftpb.Message) {
  112. // TODO (xiangli): reasonable retry logic
  113. for i := 0; i < 3; i++ {
  114. u := cls.Get().Pick(m.To)
  115. if u == "" {
  116. // TODO: unknown peer id.. what do we do? I
  117. // don't think his should ever happen, need to
  118. // look into this further.
  119. log.Printf("etcdhttp: no addr for %d", m.To)
  120. return
  121. }
  122. u = fmt.Sprintf("%s%s", u, raftPrefix)
  123. // TODO: don't block. we should be able to have 1000s
  124. // of messages out at a time.
  125. data, err := m.Marshal()
  126. if err != nil {
  127. log.Println("etcdhttp: dropping message:", err)
  128. return // drop bad message
  129. }
  130. if httpPost(c, u, data) {
  131. return // success
  132. }
  133. // TODO: backoff
  134. }
  135. }
  136. func httpPost(c *http.Client, url string, data []byte) bool {
  137. resp, err := c.Post(url, "application/protobuf", bytes.NewBuffer(data))
  138. if err != nil {
  139. // TODO: log the error?
  140. return false
  141. }
  142. resp.Body.Close()
  143. if resp.StatusCode != http.StatusNoContent {
  144. // TODO: log the error?
  145. return false
  146. }
  147. return true
  148. }