|
|
@@ -78,25 +78,20 @@ func (s *clusterStore) Delete(id int64) {
|
|
|
func Sender(t *http.Transport, cls ClusterStore) func(msgs []raftpb.Message) {
|
|
|
c := &http.Client{Transport: t}
|
|
|
|
|
|
- scheme := "http"
|
|
|
- if t.TLSClientConfig != nil {
|
|
|
- scheme = "https"
|
|
|
- }
|
|
|
-
|
|
|
return func(msgs []raftpb.Message) {
|
|
|
for _, m := range msgs {
|
|
|
// TODO: reuse go routines
|
|
|
// limit the number of outgoing connections for the same receiver
|
|
|
- go send(c, scheme, cls, m)
|
|
|
+ go send(c, cls, m)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func send(c *http.Client, scheme string, cls ClusterStore, m raftpb.Message) {
|
|
|
+func send(c *http.Client, cls ClusterStore, m raftpb.Message) {
|
|
|
// TODO (xiangli): reasonable retry logic
|
|
|
for i := 0; i < 3; i++ {
|
|
|
- addr := cls.Get().Pick(m.To)
|
|
|
- if addr == "" {
|
|
|
+ u := cls.Get().Pick(m.To)
|
|
|
+ if u == "" {
|
|
|
// TODO: unknown peer id.. what do we do? I
|
|
|
// don't think his should ever happen, need to
|
|
|
// look into this further.
|
|
|
@@ -104,7 +99,7 @@ func send(c *http.Client, scheme string, cls ClusterStore, m raftpb.Message) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- url := fmt.Sprintf("%s://%s%s", scheme, addr, raftPrefix)
|
|
|
+ u = fmt.Sprintf("%s%s", u, raftPrefix)
|
|
|
|
|
|
// TODO: don't block. we should be able to have 1000s
|
|
|
// of messages out at a time.
|
|
|
@@ -113,7 +108,7 @@ func send(c *http.Client, scheme string, cls ClusterStore, m raftpb.Message) {
|
|
|
log.Println("etcdhttp: dropping message:", err)
|
|
|
return // drop bad message
|
|
|
}
|
|
|
- if httpPost(c, url, data) {
|
|
|
+ if httpPost(c, u, data) {
|
|
|
return // success
|
|
|
}
|
|
|
// TODO: backoff
|