cluster_store.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package etcdserver
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "net/http"
  8. "github.com/coreos/etcd/raft/raftpb"
  9. "github.com/coreos/etcd/store"
  10. )
  11. const (
  12. raftPrefix = "/raft"
  13. )
  14. type ClusterStore interface {
  15. Get() Cluster
  16. Delete(id int64)
  17. }
  18. type clusterStore struct {
  19. Store store.Store
  20. }
  21. func NewClusterStore(st store.Store, c Cluster) ClusterStore {
  22. cls := &clusterStore{Store: st}
  23. for _, m := range c {
  24. cls.add(*m)
  25. }
  26. return cls
  27. }
  28. // add puts a new Member into the store.
  29. // A Member with a matching id must not exist.
  30. func (s *clusterStore) add(m Member) {
  31. b, err := json.Marshal(m)
  32. if err != nil {
  33. log.Panicf("marshal peer info error: %v", err)
  34. }
  35. if _, err := s.Store.Create(m.storeKey(), false, string(b), false, store.Permanent); err != nil {
  36. log.Panicf("add member should never fail: %v", err)
  37. }
  38. }
  39. // TODO(philips): keep the latest copy without going to the store to avoid the
  40. // lock here.
  41. func (s *clusterStore) Get() Cluster {
  42. c := &Cluster{}
  43. e, err := s.Store.Get(machineKVPrefix, true, false)
  44. if err != nil {
  45. log.Panicf("get member should never fail: %v", err)
  46. }
  47. for _, n := range e.Node.Nodes {
  48. m := Member{}
  49. if err := json.Unmarshal([]byte(*n.Value), &m); err != nil {
  50. log.Panicf("unmarshal peer error: %v", err)
  51. }
  52. err := c.Add(m)
  53. if err != nil {
  54. log.Panicf("add member to cluster should never fail: %v", err)
  55. }
  56. }
  57. return *c
  58. }
  59. // Delete removes a member from the store.
  60. // The given id MUST exist.
  61. func (s *clusterStore) Delete(id int64) {
  62. p := s.Get().FindID(id).storeKey()
  63. if _, err := s.Store.Delete(p, false, false); err != nil {
  64. log.Panicf("delete peer should never fail: %v", err)
  65. }
  66. }
  67. // addScheme adds the protocol prefix to a string; currently only HTTP
  68. // TODO: improve this when implementing TLS
  69. func addScheme(addr string) string {
  70. return fmt.Sprintf("http://%s", addr)
  71. }
  72. func Sender(t *http.Transport, cls ClusterStore) func(msgs []raftpb.Message) {
  73. c := &http.Client{Transport: t}
  74. scheme := "http"
  75. if t.TLSClientConfig != nil {
  76. scheme = "https"
  77. }
  78. return func(msgs []raftpb.Message) {
  79. for _, m := range msgs {
  80. // TODO: reuse go routines
  81. // limit the number of outgoing connections for the same receiver
  82. go send(c, scheme, cls, m)
  83. }
  84. }
  85. }
  86. func send(c *http.Client, scheme string, cls ClusterStore, m raftpb.Message) {
  87. // TODO (xiangli): reasonable retry logic
  88. for i := 0; i < 3; i++ {
  89. addr := cls.Get().Pick(m.To)
  90. if addr == "" {
  91. // TODO: unknown peer id.. what do we do? I
  92. // don't think his should ever happen, need to
  93. // look into this further.
  94. log.Printf("etcdhttp: no addr for %d", m.To)
  95. return
  96. }
  97. url := fmt.Sprintf("%s://%s%s", scheme, addr, raftPrefix)
  98. // TODO: don't block. we should be able to have 1000s
  99. // of messages out at a time.
  100. data, err := m.Marshal()
  101. if err != nil {
  102. log.Println("etcdhttp: dropping message:", err)
  103. return // drop bad message
  104. }
  105. if httpPost(c, url, data) {
  106. return // success
  107. }
  108. // TODO: backoff
  109. }
  110. }
  111. func httpPost(c *http.Client, url string, data []byte) bool {
  112. resp, err := c.Post(url, "application/protobuf", bytes.NewBuffer(data))
  113. if err != nil {
  114. // TODO: log the error?
  115. return false
  116. }
  117. resp.Body.Close()
  118. if resp.StatusCode != http.StatusNoContent {
  119. // TODO: log the error?
  120. return false
  121. }
  122. return true
  123. }