sender.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdserver
  14. import (
  15. "bytes"
  16. "fmt"
  17. "log"
  18. "net/http"
  19. "time"
  20. "github.com/coreos/etcd/etcdserver/stats"
  21. "github.com/coreos/etcd/pkg/types"
  22. "github.com/coreos/etcd/raft/raftpb"
  23. )
  24. const raftPrefix = "/raft"
  25. // Sender creates the default production sender used to transport raft messages
  26. // in the cluster. The returned sender will update the given ServerStats and
  27. // LeaderStats appropriately.
  28. func Sender(t *http.Transport, cl *Cluster, ss *stats.ServerStats, ls *stats.LeaderStats) func(msgs []raftpb.Message) {
  29. c := &http.Client{Transport: t}
  30. return func(msgs []raftpb.Message) {
  31. for _, m := range msgs {
  32. // TODO: reuse go routines
  33. // limit the number of outgoing connections for the same receiver
  34. go send(c, cl, m, ss, ls)
  35. }
  36. }
  37. }
  38. // send uses the given client to send a message to a member in the given
  39. // ClusterStore, retrying up to 3 times for each message. The given
  40. // ServerStats and LeaderStats are updated appropriately
  41. func send(c *http.Client, cl *Cluster, m raftpb.Message, ss *stats.ServerStats, ls *stats.LeaderStats) {
  42. to := types.ID(m.To)
  43. cid := cl.ID()
  44. // TODO (xiangli): reasonable retry logic
  45. for i := 0; i < 3; i++ {
  46. memb := cl.Member(to)
  47. if memb == nil {
  48. if !cl.IsIDRemoved(to) {
  49. // TODO: unknown peer id.. what do we do? I
  50. // don't think his should ever happen, need to
  51. // look into this further.
  52. log.Printf("etcdserver: error sending message to unknown receiver %s", to.String())
  53. }
  54. return
  55. }
  56. u := fmt.Sprintf("%s%s", memb.PickPeerURL(), raftPrefix)
  57. // TODO: don't block. we should be able to have 1000s
  58. // of messages out at a time.
  59. data, err := m.Marshal()
  60. if err != nil {
  61. log.Println("sender: dropping message:", err)
  62. return // drop bad message
  63. }
  64. if m.Type == raftpb.MsgApp {
  65. ss.SendAppendReq(len(data))
  66. }
  67. fs := ls.Follower(to.String())
  68. start := time.Now()
  69. sent := httpPost(c, u, cid, data)
  70. end := time.Now()
  71. if sent {
  72. fs.Succ(end.Sub(start))
  73. return
  74. }
  75. fs.Fail()
  76. // TODO: backoff
  77. }
  78. }
  79. // httpPost POSTs a data payload to a url using the given client. Returns true
  80. // if the POST succeeds, false on any failure.
  81. func httpPost(c *http.Client, url string, cid types.ID, data []byte) bool {
  82. req, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
  83. if err != nil {
  84. // TODO: log the error?
  85. return false
  86. }
  87. req.Header.Set("Content-Type", "application/protobuf")
  88. req.Header.Set("X-Etcd-Cluster-ID", cid.String())
  89. resp, err := c.Do(req)
  90. if err != nil {
  91. // TODO: log the error?
  92. return false
  93. }
  94. resp.Body.Close()
  95. switch resp.StatusCode {
  96. case http.StatusPreconditionFailed:
  97. // TODO: shutdown the etcdserver gracefully?
  98. log.Fatalf("etcd: conflicting cluster ID with the target cluster (%s != %s). Exiting.", resp.Header.Get("X-Etcd-Cluster-ID"), cid.String())
  99. return false
  100. case http.StatusForbidden:
  101. // TODO: stop the server
  102. log.Fatalf("etcd: this member has been permanently removed from the cluster. Exiting.")
  103. return false
  104. case http.StatusNoContent:
  105. return true
  106. default:
  107. return false
  108. }
  109. }