cluster_store.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package etcdserver
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "net/http"
  8. "time"
  9. etcdErr "github.com/coreos/etcd/error"
  10. "github.com/coreos/etcd/etcdserver/stats"
  11. "github.com/coreos/etcd/raft/raftpb"
  12. "github.com/coreos/etcd/store"
  13. )
  14. const (
  15. raftPrefix = "/raft"
  16. raftAttributesSuffix = "/raftAttributes"
  17. attributesSuffix = "/attributes"
  18. )
  19. type ClusterStore interface {
  20. Add(m Member)
  21. Get() Cluster
  22. Remove(id uint64)
  23. }
  24. type clusterStore struct {
  25. Store store.Store
  26. }
  27. // Add puts a new Member into the store.
  28. // A Member with a matching id must not exist.
  29. func (s *clusterStore) Add(m Member) {
  30. b, err := json.Marshal(m.RaftAttributes)
  31. if err != nil {
  32. log.Panicf("marshal error: %v", err)
  33. }
  34. if _, err := s.Store.Create(m.storeKey()+raftAttributesSuffix, false, string(b), false, store.Permanent); err != nil {
  35. log.Panicf("add raftAttributes should never fail: %v", err)
  36. }
  37. b, err = json.Marshal(m.Attributes)
  38. if err != nil {
  39. log.Panicf("marshal error: %v", err)
  40. }
  41. if _, err := s.Store.Create(m.storeKey()+attributesSuffix, false, string(b), false, store.Permanent); err != nil {
  42. log.Panicf("add attributes should never fail: %v", err)
  43. }
  44. }
  45. // TODO(philips): keep the latest copy without going to the store to avoid the
  46. // lock here.
  47. func (s *clusterStore) Get() Cluster {
  48. c := &Cluster{}
  49. e, err := s.Store.Get(membersKVPrefix, true, true)
  50. if err != nil {
  51. if v, ok := err.(*etcdErr.Error); ok && v.ErrorCode == etcdErr.EcodeKeyNotFound {
  52. return *c
  53. }
  54. log.Panicf("get member should never fail: %v", err)
  55. }
  56. for _, n := range e.Node.Nodes {
  57. m, err := nodeToMember(n)
  58. if err != nil {
  59. log.Panicf("unexpected nodeToMember error: %v", err)
  60. }
  61. if err := c.Add(m); err != nil {
  62. log.Panicf("add member to cluster should never fail: %v", err)
  63. }
  64. }
  65. return *c
  66. }
  67. // nodeToMember builds member through a store node.
  68. // the child nodes of the given node should be sorted by key.
  69. func nodeToMember(n *store.NodeExtern) (Member, error) {
  70. m := Member{ID: parseMemberID(n.Key)}
  71. if len(n.Nodes) != 2 {
  72. return m, fmt.Errorf("len(nodes) = %d, want 2", len(n.Nodes))
  73. }
  74. if w := n.Key + attributesSuffix; n.Nodes[0].Key != w {
  75. return m, fmt.Errorf("key = %v, want %v", n.Nodes[0].Key, w)
  76. }
  77. if err := json.Unmarshal([]byte(*n.Nodes[0].Value), &m.Attributes); err != nil {
  78. return m, fmt.Errorf("unmarshal attributes error: %v", err)
  79. }
  80. if w := n.Key + raftAttributesSuffix; n.Nodes[1].Key != w {
  81. return m, fmt.Errorf("key = %v, want %v", n.Nodes[1].Key, w)
  82. }
  83. if err := json.Unmarshal([]byte(*n.Nodes[1].Value), &m.RaftAttributes); err != nil {
  84. return m, fmt.Errorf("unmarshal raftAttributes error: %v", err)
  85. }
  86. return m, nil
  87. }
  88. // Remove removes a member from the store.
  89. // The given id MUST exist.
  90. func (s *clusterStore) Remove(id uint64) {
  91. p := s.Get().FindID(id).storeKey()
  92. if _, err := s.Store.Delete(p, true, true); err != nil {
  93. log.Panicf("delete peer should never fail: %v", err)
  94. }
  95. }
  96. // Sender creates the default production sender used to transport raft messages
  97. // in the cluster. The returned sender will update the given ServerStats and
  98. // LeaderStats appropriately.
  99. func Sender(t *http.Transport, cls ClusterStore, ss *stats.ServerStats, ls *stats.LeaderStats) func(msgs []raftpb.Message) {
  100. c := &http.Client{Transport: t}
  101. return func(msgs []raftpb.Message) {
  102. for _, m := range msgs {
  103. // TODO: reuse go routines
  104. // limit the number of outgoing connections for the same receiver
  105. go send(c, cls, m, ss, ls)
  106. }
  107. }
  108. }
  109. // send uses the given client to send a message to a member in the given
  110. // ClusterStore, retrying up to 3 times for each message. The given
  111. // ServerStats and LeaderStats are updated appropriately
  112. func send(c *http.Client, cls ClusterStore, m raftpb.Message, ss *stats.ServerStats, ls *stats.LeaderStats) {
  113. // TODO (xiangli): reasonable retry logic
  114. for i := 0; i < 3; i++ {
  115. u := cls.Get().Pick(m.To)
  116. if u == "" {
  117. // TODO: unknown peer id.. what do we do? I
  118. // don't think his should ever happen, need to
  119. // look into this further.
  120. log.Printf("etcdhttp: no addr for %d", m.To)
  121. return
  122. }
  123. u = fmt.Sprintf("%s%s", u, raftPrefix)
  124. // TODO: don't block. we should be able to have 1000s
  125. // of messages out at a time.
  126. data, err := m.Marshal()
  127. if err != nil {
  128. log.Println("etcdhttp: dropping message:", err)
  129. return // drop bad message
  130. }
  131. if m.Type == raftpb.MsgApp {
  132. ss.SendAppendReq(len(data))
  133. }
  134. to := idAsHex(m.To)
  135. fs := ls.Follower(to)
  136. start := time.Now()
  137. sent := httpPost(c, u, data)
  138. end := time.Now()
  139. if sent {
  140. fs.Succ(end.Sub(start))
  141. return
  142. }
  143. fs.Fail()
  144. // TODO: backoff
  145. }
  146. }
  147. // httpPost POSTs a data payload to a url using the given client. Returns true
  148. // if the POST succeeds, false on any failure.
  149. func httpPost(c *http.Client, url string, data []byte) bool {
  150. resp, err := c.Post(url, "application/protobuf", bytes.NewBuffer(data))
  151. if err != nil {
  152. // TODO: log the error?
  153. return false
  154. }
  155. resp.Body.Close()
  156. if resp.StatusCode != http.StatusNoContent {
  157. // TODO: log the error?
  158. return false
  159. }
  160. return true
  161. }