Browse Source

Merge pull request #1001 from coreos/sender

etcdhttp: non-blocking sender
Xiang Li 11 years ago
parent
commit
55b4267c30
1 changed files with 32 additions and 27 deletions
  1. 32 27
      etcdserver/etcdhttp/http.go

+ 32 - 27
etcdserver/etcdhttp/http.go

@@ -81,37 +81,41 @@ const DefaultTimeout = 500 * time.Millisecond
 func Sender(p Peers) func(msgs []raftpb.Message) {
 func Sender(p Peers) func(msgs []raftpb.Message) {
 	return func(msgs []raftpb.Message) {
 	return func(msgs []raftpb.Message) {
 		for _, m := range msgs {
 		for _, m := range msgs {
-			// TODO: create workers that deal with message sending
-			// concurrently as to not block progress
-			for {
-				url := p.Pick(m.To)
-				if url == "" {
-					// TODO: unknown peer id.. what do we do? I
-					// don't think his should ever happen, need to
-					// look into this further.
-					log.Println("etcdhttp: no addr for %d", m.To)
-					break
-				}
-
-				url += "/raft"
-
-				// TODO: don't block. we should be able to have 1000s
-				// of messages out at a time.
-				data, err := m.Marshal()
-				if err != nil {
-					log.Println("etcdhttp: dropping message:", err)
-					break // drop bad message
-				}
-				if httpPost(url, data) {
-					break // success
-				}
-
-				// TODO: backoff
-			}
+			// TODO: reuse go routines
+			// limit the number of outgoing connections for the same receiver
+			go send(p, m)
 		}
 		}
 	}
 	}
 }
 }
 
 
+func send(p Peers, m raftpb.Message) {
+	// TODO (xiangli): reasonable retry logic
+	for i := 0; i < 3; i++ {
+		url := p.Pick(m.To)
+		if url == "" {
+			// TODO: unknown peer id.. what do we do? I
+			// don't think his should ever happen, need to
+			// look into this further.
+			log.Println("etcdhttp: no addr for %d", m.To)
+			return
+		}
+
+		url += "/raft"
+
+		// TODO: don't block. we should be able to have 1000s
+		// of messages out at a time.
+		data, err := m.Marshal()
+		if err != nil {
+			log.Println("etcdhttp: dropping message:", err)
+			return // drop bad message
+		}
+		if httpPost(url, data) {
+			return // success
+		}
+		// TODO: backoff
+	}
+}
+
 func httpPost(url string, data []byte) bool {
 func httpPost(url string, data []byte) bool {
 	// TODO: set timeouts
 	// TODO: set timeouts
 	resp, err := http.Post(url, "application/protobuf", bytes.NewBuffer(data))
 	resp, err := http.Post(url, "application/protobuf", bytes.NewBuffer(data))
@@ -119,6 +123,7 @@ func httpPost(url string, data []byte) bool {
 		elog.TODO()
 		elog.TODO()
 		return false
 		return false
 	}
 	}
+	resp.Body.Close()
 	if resp.StatusCode != 200 {
 	if resp.StatusCode != 200 {
 		elog.TODO()
 		elog.TODO()
 		return false
 		return false