Browse Source

chore(server): remove useless CancelWhenTimeout

Setting request timeout is covered by go-httpclient now.
Yicheng Qin 11 years ago
parent
commit
349a802a82
2 changed files with 7 additions and 26 deletions
  1. 3 6
      server/peer_server.go
  2. 4 20
      server/transporter.go

+ 3 - 6
server/peer_server.go

@@ -300,13 +300,12 @@ func (s *PeerServer) startAsFollower(cluster []string) {
 
 
 // getVersion fetches the peer version of a cluster.
 // getVersion fetches the peer version of a cluster.
 func getVersion(t *transporter, versionURL url.URL) (int, error) {
 func getVersion(t *transporter, versionURL url.URL) (int, error) {
-	resp, req, err := t.Get(versionURL.String())
+	resp, _, err := t.Get(versionURL.String())
 	if err != nil {
 	if err != nil {
 		return 0, err
 		return 0, err
 	}
 	}
 	defer resp.Body.Close()
 	defer resp.Body.Close()
 
 
-	t.CancelWhenTimeout(req)
 	body, err := ioutil.ReadAll(resp.Body)
 	body, err := ioutil.ReadAll(resp.Body)
 	if err != nil {
 	if err != nil {
 		return 0, err
 		return 0, err
@@ -386,7 +385,7 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
 
 
 	log.Debugf("Send Join Request to %s", joinURL.String())
 	log.Debugf("Send Join Request to %s", joinURL.String())
 
 
-	resp, req, err := t.Post(joinURL.String(), &b)
+	resp, _, err := t.Post(joinURL.String(), &b)
 
 
 	for {
 	for {
 		if err != nil {
 		if err != nil {
@@ -395,8 +394,6 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
 		if resp != nil {
 		if resp != nil {
 			defer resp.Body.Close()
 			defer resp.Body.Close()
 
 
-			t.CancelWhenTimeout(req)
-
 			if resp.StatusCode == http.StatusOK {
 			if resp.StatusCode == http.StatusOK {
 				b, _ := ioutil.ReadAll(resp.Body)
 				b, _ := ioutil.ReadAll(resp.Body)
 				s.joinIndex, _ = binary.Uvarint(b)
 				s.joinIndex, _ = binary.Uvarint(b)
@@ -406,7 +403,7 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
 				address := resp.Header.Get("Location")
 				address := resp.Header.Get("Location")
 				log.Debugf("Send Join Request to %s", address)
 				log.Debugf("Send Join Request to %s", address)
 				json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.Config.URL, s.server.URL()))
 				json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.Config.URL, s.server.URL()))
-				resp, req, err = t.Post(address, &b)
+				resp, _, err = t.Post(address, &b)
 
 
 			} else if resp.StatusCode == http.StatusBadRequest {
 			} else if resp.StatusCode == http.StatusBadRequest {
 				log.Debug("Reach max number peers in the cluster")
 				log.Debug("Reach max number peers in the cluster")

+ 4 - 20
server/transporter.go

@@ -87,7 +87,7 @@ func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Pe
 
 
 	start := time.Now()
 	start := time.Now()
 
 
-	resp, httpRequest, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
+	resp, _, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
 
 
 	end := time.Now()
 	end := time.Now()
 
 
@@ -106,8 +106,6 @@ func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Pe
 	if resp != nil {
 	if resp != nil {
 		defer resp.Body.Close()
 		defer resp.Body.Close()
 
 
-		t.CancelWhenTimeout(httpRequest)
-
 		aeresp := &raft.AppendEntriesResponse{}
 		aeresp := &raft.AppendEntriesResponse{}
 		if _, err = aeresp.Decode(resp.Body); err != nil && err != io.EOF {
 		if _, err = aeresp.Decode(resp.Body); err != nil && err != io.EOF {
 			log.Warn("transporter.ae.decoding.error:", err)
 			log.Warn("transporter.ae.decoding.error:", err)
@@ -131,7 +129,7 @@ func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *
 	u, _ := t.registry.PeerURL(peer.Name)
 	u, _ := t.registry.PeerURL(peer.Name)
 	log.Debugf("Send Vote from %s to %s", server.Name(), u)
 	log.Debugf("Send Vote from %s to %s", server.Name(), u)
 
 
-	resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
+	resp, _, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
 
 
 	if err != nil {
 	if err != nil {
 		log.Debugf("Cannot send VoteRequest to %s : %s", u, err)
 		log.Debugf("Cannot send VoteRequest to %s : %s", u, err)
@@ -140,8 +138,6 @@ func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *
 	if resp != nil {
 	if resp != nil {
 		defer resp.Body.Close()
 		defer resp.Body.Close()
 
 
-		t.CancelWhenTimeout(httpRequest)
-
 		rvrsp := &raft.RequestVoteResponse{}
 		rvrsp := &raft.RequestVoteResponse{}
 		if _, err = rvrsp.Decode(resp.Body); err != nil && err != io.EOF {
 		if _, err = rvrsp.Decode(resp.Body); err != nil && err != io.EOF {
 			log.Warn("transporter.vr.decoding.error:", err)
 			log.Warn("transporter.vr.decoding.error:", err)
@@ -164,7 +160,7 @@ func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, r
 	u, _ := t.registry.PeerURL(peer.Name)
 	u, _ := t.registry.PeerURL(peer.Name)
 	log.Debugf("Send Snapshot Request from %s to %s", server.Name(), u)
 	log.Debugf("Send Snapshot Request from %s to %s", server.Name(), u)
 
 
-	resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
+	resp, _, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
 
 
 	if err != nil {
 	if err != nil {
 		log.Debugf("Cannot send Snapshot Request to %s : %s", u, err)
 		log.Debugf("Cannot send Snapshot Request to %s : %s", u, err)
@@ -173,8 +169,6 @@ func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, r
 	if resp != nil {
 	if resp != nil {
 		defer resp.Body.Close()
 		defer resp.Body.Close()
 
 
-		t.CancelWhenTimeout(httpRequest)
-
 		ssrsp := &raft.SnapshotResponse{}
 		ssrsp := &raft.SnapshotResponse{}
 		if _, err = ssrsp.Decode(resp.Body); err != nil && err != io.EOF {
 		if _, err = ssrsp.Decode(resp.Body); err != nil && err != io.EOF {
 			log.Warn("transporter.ss.decoding.error:", err)
 			log.Warn("transporter.ss.decoding.error:", err)
@@ -197,7 +191,7 @@ func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft
 	u, _ := t.registry.PeerURL(peer.Name)
 	u, _ := t.registry.PeerURL(peer.Name)
 	log.Debugf("Send Snapshot Recovery from %s to %s", server.Name(), u)
 	log.Debugf("Send Snapshot Recovery from %s to %s", server.Name(), u)
 
 
-	resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
+	resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
 
 
 	if err != nil {
 	if err != nil {
 		log.Debugf("Cannot send Snapshot Recovery to %s : %s", u, err)
 		log.Debugf("Cannot send Snapshot Recovery to %s : %s", u, err)
@@ -206,8 +200,6 @@ func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft
 	if resp != nil {
 	if resp != nil {
 		defer resp.Body.Close()
 		defer resp.Body.Close()
 
 
-		t.CancelWhenTimeout(httpRequest)
-
 		ssrrsp := &raft.SnapshotRecoveryResponse{}
 		ssrrsp := &raft.SnapshotRecoveryResponse{}
 		if _, err = ssrrsp.Decode(resp.Body); err != nil && err != io.EOF {
 		if _, err = ssrrsp.Decode(resp.Body); err != nil && err != io.EOF {
 			log.Warn("transporter.ssr.decoding.error:", err)
 			log.Warn("transporter.ssr.decoding.error:", err)
@@ -232,11 +224,3 @@ func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error)
 	resp, err := t.client.Do(req)
 	resp, err := t.client.Do(req)
 	return resp, req, err
 	return resp, req, err
 }
 }
-
-// Cancel the on fly HTTP transaction when timeout happens.
-func (t *transporter) CancelWhenTimeout(req *http.Request) {
-	go func() {
-		time.Sleep(t.requestTimeout)
-		t.transport.CancelRequest(req)
-	}()
-}