cluster_store.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package etcdserver
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "net/http"
  8. "github.com/coreos/etcd/raft/raftpb"
  9. "github.com/coreos/etcd/store"
  10. )
  11. const (
  12. raftPrefix = "/raft"
  13. )
  14. type ClusterStore interface {
  15. Create(m Member)
  16. Get() Cluster
  17. Delete(id int64)
  18. }
  19. type clusterStore struct {
  20. Store store.Store
  21. }
  22. func NewClusterStore(st store.Store, c Cluster) ClusterStore {
  23. cls := &clusterStore{Store: st}
  24. for _, m := range c {
  25. cls.Create(*m)
  26. }
  27. return cls
  28. }
  29. // Create puts a new Member into the store.
  30. // A Member with a matching id must not exist.
  31. func (s *clusterStore) Create(m Member) {
  32. b, err := json.Marshal(m)
  33. if err != nil {
  34. log.Panicf("marshal peer info error: %v", err)
  35. }
  36. if _, err := s.Store.Create(m.storeKey(), false, string(b), false, store.Permanent); err != nil {
  37. log.Panicf("add member should never fail: %v", err)
  38. }
  39. }
  40. // TODO(philips): keep the latest copy without going to the store to avoid the
  41. // lock here.
  42. func (s *clusterStore) Get() Cluster {
  43. c := &Cluster{}
  44. e, err := s.Store.Get(machineKVPrefix, true, false)
  45. if err != nil {
  46. log.Panicf("get member should never fail: %v", err)
  47. }
  48. for _, n := range e.Node.Nodes {
  49. m := Member{}
  50. if err := json.Unmarshal([]byte(*n.Value), &m); err != nil {
  51. log.Panicf("unmarshal peer error: %v", err)
  52. }
  53. err := c.Add(m)
  54. if err != nil {
  55. log.Panicf("add member to cluster should never fail: %v", err)
  56. }
  57. }
  58. return *c
  59. }
  60. // Delete removes a member from the store.
  61. // The given id MUST exist.
  62. func (s *clusterStore) Delete(id int64) {
  63. p := s.Get().FindID(id).storeKey()
  64. if _, err := s.Store.Delete(p, false, false); err != nil {
  65. log.Panicf("delete peer should never fail: %v", err)
  66. }
  67. }
  68. func Sender(t *http.Transport, cls ClusterStore) func(msgs []raftpb.Message) {
  69. c := &http.Client{Transport: t}
  70. return func(msgs []raftpb.Message) {
  71. for _, m := range msgs {
  72. // TODO: reuse go routines
  73. // limit the number of outgoing connections for the same receiver
  74. go send(c, cls, m)
  75. }
  76. }
  77. }
  78. func send(c *http.Client, cls ClusterStore, m raftpb.Message) {
  79. // TODO (xiangli): reasonable retry logic
  80. for i := 0; i < 3; i++ {
  81. u := cls.Get().Pick(m.To)
  82. if u == "" {
  83. // TODO: unknown peer id.. what do we do? I
  84. // don't think his should ever happen, need to
  85. // look into this further.
  86. log.Printf("etcdhttp: no addr for %d", m.To)
  87. return
  88. }
  89. u = fmt.Sprintf("%s%s", u, raftPrefix)
  90. // TODO: don't block. we should be able to have 1000s
  91. // of messages out at a time.
  92. data, err := m.Marshal()
  93. if err != nil {
  94. log.Println("etcdhttp: dropping message:", err)
  95. return // drop bad message
  96. }
  97. if httpPost(c, u, data) {
  98. return // success
  99. }
  100. // TODO: backoff
  101. }
  102. }
  103. func httpPost(c *http.Client, url string, data []byte) bool {
  104. resp, err := c.Post(url, "application/protobuf", bytes.NewBuffer(data))
  105. if err != nil {
  106. // TODO: log the error?
  107. return false
  108. }
  109. resp.Body.Close()
  110. if resp.StatusCode != http.StatusNoContent {
  111. // TODO: log the error?
  112. return false
  113. }
  114. return true
  115. }