sender.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdserver
  14. import (
  15. "bytes"
  16. "fmt"
  17. "log"
  18. "net/http"
  19. "strconv"
  20. "time"
  21. "github.com/coreos/etcd/etcdserver/stats"
  22. "github.com/coreos/etcd/pkg/strutil"
  23. "github.com/coreos/etcd/raft/raftpb"
  24. )
  25. const raftPrefix = "/raft"
  26. // Sender creates the default production sender used to transport raft messages
  27. // in the cluster. The returned sender will update the given ServerStats and
  28. // LeaderStats appropriately.
  29. func Sender(t *http.Transport, cl *Cluster, ss *stats.ServerStats, ls *stats.LeaderStats) func(msgs []raftpb.Message) {
  30. c := &http.Client{Transport: t}
  31. return func(msgs []raftpb.Message) {
  32. for _, m := range msgs {
  33. // TODO: reuse go routines
  34. // limit the number of outgoing connections for the same receiver
  35. go send(c, cl, m, ss, ls)
  36. }
  37. }
  38. }
  39. // send uses the given client to send a message to a member in the given
  40. // ClusterStore, retrying up to 3 times for each message. The given
  41. // ServerStats and LeaderStats are updated appropriately
  42. func send(c *http.Client, cl *Cluster, m raftpb.Message, ss *stats.ServerStats, ls *stats.LeaderStats) {
  43. cid := cl.ID()
  44. // TODO (xiangli): reasonable retry logic
  45. for i := 0; i < 3; i++ {
  46. memb := cl.Member(m.To)
  47. if memb == nil {
  48. // TODO: unknown peer id.. what do we do? I
  49. // don't think his should ever happen, need to
  50. // look into this further.
  51. log.Printf("etcdhttp: no member for %d", m.To)
  52. return
  53. }
  54. u := fmt.Sprintf("%s%s", memb.PickPeerURL(), raftPrefix)
  55. // TODO: don't block. we should be able to have 1000s
  56. // of messages out at a time.
  57. data, err := m.Marshal()
  58. if err != nil {
  59. log.Println("etcdhttp: dropping message:", err)
  60. return // drop bad message
  61. }
  62. if m.Type == raftpb.MsgApp {
  63. ss.SendAppendReq(len(data))
  64. }
  65. to := strutil.IDAsHex(m.To)
  66. fs := ls.Follower(to)
  67. start := time.Now()
  68. sent := httpPost(c, u, cid, data)
  69. end := time.Now()
  70. if sent {
  71. fs.Succ(end.Sub(start))
  72. return
  73. }
  74. fs.Fail()
  75. // TODO: backoff
  76. }
  77. }
  78. // httpPost POSTs a data payload to a url using the given client. Returns true
  79. // if the POST succeeds, false on any failure.
  80. func httpPost(c *http.Client, url string, cid uint64, data []byte) bool {
  81. req, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
  82. if err != nil {
  83. // TODO: log the error?
  84. return false
  85. }
  86. req.Header.Set("Content-Type", "application/protobuf")
  87. req.Header.Set("X-Etcd-Cluster-ID", strconv.FormatUint(cid, 16))
  88. resp, err := c.Do(req)
  89. if err != nil {
  90. // TODO: log the error?
  91. return false
  92. }
  93. resp.Body.Close()
  94. switch resp.StatusCode {
  95. case http.StatusPreconditionFailed:
  96. // TODO: shutdown the etcdserver gracefully?
  97. log.Panicf("clusterID mismatch")
  98. return false
  99. case http.StatusForbidden:
  100. // TODO: stop the server
  101. log.Panicf("the member has been removed")
  102. return false
  103. case http.StatusNoContent:
  104. return true
  105. default:
  106. return false
  107. }
  108. }