sender.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdserver
  14. import (
  15. "bytes"
  16. "fmt"
  17. "log"
  18. "net/http"
  19. "strconv"
  20. "time"
  21. "github.com/coreos/etcd/etcdserver/stats"
  22. "github.com/coreos/etcd/raft/raftpb"
  23. )
  24. const raftPrefix = "/raft"
  25. // Sender creates the default production sender used to transport raft messages
  26. // in the cluster. The returned sender will update the given ServerStats and
  27. // LeaderStats appropriately.
  28. func Sender(t *http.Transport, cl *Cluster, ss *stats.ServerStats, ls *stats.LeaderStats) func(msgs []raftpb.Message) {
  29. c := &http.Client{Transport: t}
  30. return func(msgs []raftpb.Message) {
  31. for _, m := range msgs {
  32. // TODO: reuse go routines
  33. // limit the number of outgoing connections for the same receiver
  34. go send(c, cl, m, ss, ls)
  35. }
  36. }
  37. }
  38. // send uses the given client to send a message to a member in the given
  39. // ClusterStore, retrying up to 3 times for each message. The given
  40. // ServerStats and LeaderStats are updated appropriately
  41. func send(c *http.Client, cl *Cluster, m raftpb.Message, ss *stats.ServerStats, ls *stats.LeaderStats) {
  42. cid := cl.ID()
  43. // TODO (xiangli): reasonable retry logic
  44. for i := 0; i < 3; i++ {
  45. memb := cl.Member(m.To)
  46. if memb == nil {
  47. // TODO: unknown peer id.. what do we do? I
  48. // don't think his should ever happen, need to
  49. // look into this further.
  50. log.Printf("etcdhttp: no member for %d", m.To)
  51. return
  52. }
  53. u := fmt.Sprintf("%s%s", memb.PickPeerURL(), raftPrefix)
  54. // TODO: don't block. we should be able to have 1000s
  55. // of messages out at a time.
  56. data, err := m.Marshal()
  57. if err != nil {
  58. log.Println("etcdhttp: dropping message:", err)
  59. return // drop bad message
  60. }
  61. if m.Type == raftpb.MsgApp {
  62. ss.SendAppendReq(len(data))
  63. }
  64. to := idAsHex(m.To)
  65. fs := ls.Follower(to)
  66. start := time.Now()
  67. sent := httpPost(c, u, cid, data)
  68. end := time.Now()
  69. if sent {
  70. fs.Succ(end.Sub(start))
  71. return
  72. }
  73. fs.Fail()
  74. // TODO: backoff
  75. }
  76. }
  77. // httpPost POSTs a data payload to a url using the given client. Returns true
  78. // if the POST succeeds, false on any failure.
  79. func httpPost(c *http.Client, url string, cid uint64, data []byte) bool {
  80. req, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
  81. if err != nil {
  82. // TODO: log the error?
  83. return false
  84. }
  85. req.Header.Set("Content-Type", "application/protobuf")
  86. req.Header.Set("X-Etcd-Cluster-ID", strconv.FormatUint(cid, 16))
  87. resp, err := c.Do(req)
  88. if err != nil {
  89. // TODO: log the error?
  90. return false
  91. }
  92. resp.Body.Close()
  93. switch resp.StatusCode {
  94. case http.StatusPreconditionFailed:
  95. // TODO: shutdown the etcdserver gracefully?
  96. log.Panicf("clusterID mismatch")
  97. return false
  98. case http.StatusForbidden:
  99. // TODO: stop the server
  100. log.Panicf("the member has been removed")
  101. return false
  102. case http.StatusNoContent:
  103. return true
  104. default:
  105. return false
  106. }
  107. }