Browse Source

rafhttp: clean up logging messages

Xiang Li 10 years ago
parent
commit
a7a4233f0b
4 changed files with 15 additions and 22 deletions
  1. 1 11
      rafthttp/peer.go
  2. 1 1
      rafthttp/remote.go
  3. 12 9
      rafthttp/stream.go
  4. 1 1
      rafthttp/transport.go

+ 1 - 11
rafthttp/peer.go

@@ -53,15 +53,6 @@ const (
 	pipelineMsg = "pipeline"
 	pipelineMsg = "pipeline"
 )
 )
 
 
-var (
-	bufSizeMap = map[string]int{
-		streamApp:   streamBufSize,
-		streamAppV2: streamBufSize,
-		streamMsg:   streamBufSize,
-		pipelineMsg: pipelineBufSize,
-	}
-)
-
 type Peer interface {
 type Peer interface {
 	// Send sends the message to the remote peer. The function is non-blocking
 	// Send sends the message to the remote peer. The function is non-blocking
 	// and has no promise that the message will be received by the remote.
 	// and has no promise that the message will be received by the remote.
@@ -170,8 +161,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
 					if isMsgSnap(m) {
 					if isMsgSnap(m) {
 						p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
 						p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
 					}
 					}
-					log.Printf("peer: dropping %s to %s since %s with %d-size buffer is blocked",
-						m.Type, p.id, name, bufSizeMap[name])
+					log.Printf("peer: dropping %s to %s since %s's sending buffer is full", m.Type, p.id, name)
 				}
 				}
 			case mm := <-p.recvc:
 			case mm := <-p.recvc:
 				if err := r.Process(context.TODO(), mm); err != nil {
 				if err := r.Process(context.TODO(), mm); err != nil {

+ 1 - 1
rafthttp/remote.go

@@ -39,7 +39,7 @@ func (g *remote) Send(m raftpb.Message) {
 	select {
 	select {
 	case g.pipeline.msgc <- m:
 	case g.pipeline.msgc <- m:
 	default:
 	default:
-		log.Printf("remote: dropping %s to %s since pipeline with %d-size buffer is blocked", m.Type, g.id, pipelineBufSize)
+		log.Printf("remote: dropping %s to %s since sending buffer is full", m.Type, g.id)
 	}
 	}
 }
 }
 
 

+ 12 - 9
rafthttp/stream.go

@@ -132,7 +132,7 @@ func (cw *streamWriter) run() {
 			if err := enc.encode(linkHeartbeatMessage); err != nil {
 			if err := enc.encode(linkHeartbeatMessage); err != nil {
 				reportSentFailure(string(t), linkHeartbeatMessage)
 				reportSentFailure(string(t), linkHeartbeatMessage)
 
 
-				log.Printf("rafthttp: failed to heartbeat on stream %s due to %v. waiting for a new stream to be established.", t, err)
+				log.Printf("rafthttp: failed to heartbeat on stream %s (%v)", t, err)
 				cw.close()
 				cw.close()
 				heartbeatc, msgc = nil, nil
 				heartbeatc, msgc = nil, nil
 				continue
 				continue
@@ -154,7 +154,7 @@ func (cw *streamWriter) run() {
 			if err := enc.encode(m); err != nil {
 			if err := enc.encode(m); err != nil {
 				reportSentFailure(string(t), m)
 				reportSentFailure(string(t), m)
 
 
-				log.Printf("rafthttp: failed to send message on stream %s due to %v. waiting for a new stream to be established.", t, err)
+				log.Printf("rafthttp: failed to send message on stream %s (%v)", t, err)
 				cw.close()
 				cw.close()
 				heartbeatc, msgc = nil, nil
 				heartbeatc, msgc = nil, nil
 				cw.r.ReportUnreachable(m.To)
 				cw.r.ReportUnreachable(m.To)
@@ -170,7 +170,7 @@ func (cw *streamWriter) run() {
 				var err error
 				var err error
 				msgAppTerm, err = strconv.ParseUint(conn.termStr, 10, 64)
 				msgAppTerm, err = strconv.ParseUint(conn.termStr, 10, 64)
 				if err != nil {
 				if err != nil {
-					log.Panicf("rafthttp: unexpected parse term %s error: %v", conn.termStr, err)
+					log.Panicf("rafthttp: could not parse term %s to uint (%v)", conn.termStr, err)
 				}
 				}
 				enc = &msgAppEncoder{w: conn.Writer, fs: cw.fs}
 				enc = &msgAppEncoder{w: conn.Writer, fs: cw.fs}
 			case streamTypeMsgAppV2:
 			case streamTypeMsgAppV2:
@@ -278,7 +278,7 @@ func (cr *streamReader) run() {
 		}
 		}
 		if err != nil {
 		if err != nil {
 			if err != errUnsupportedStreamType {
 			if err != errUnsupportedStreamType {
-				log.Printf("rafthttp: roundtripping error: %v", err)
+				log.Printf("rafthttp: failed to dial stream %s (%v)", t, err)
 			}
 			}
 		} else {
 		} else {
 			err := cr.decodeLoop(rc, t)
 			err := cr.decodeLoop(rc, t)
@@ -291,7 +291,7 @@ func (cr *streamReader) run() {
 			// heartbeat on the idle stream, so it is expected to time out.
 			// heartbeat on the idle stream, so it is expected to time out.
 			case t == streamTypeMsgApp && isNetworkTimeoutError(err):
 			case t == streamTypeMsgApp && isNetworkTimeoutError(err):
 			default:
 			default:
-				log.Printf("rafthttp: failed to read message on stream %s due to %v", t, err)
+				log.Printf("rafthttp: failed to read message on stream %s (%v)", t, err)
 			}
 			}
 		}
 		}
 		select {
 		select {
@@ -339,7 +339,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
 			select {
 			select {
 			case recvc <- m:
 			case recvc <- m:
 			default:
 			default:
-				log.Printf("rafthttp: dropping %s from %x because receive buffer is blocked",
+				log.Printf("rafthttp: dropping %s from %x because receiving buffer is full",
 					m.Type, m.From)
 					m.Type, m.From)
 			}
 			}
 		}
 		}
@@ -384,23 +384,26 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 
 
 	uu := u
 	uu := u
 	uu.Path = path.Join(t.endpoint(), cr.from.String())
 	uu.Path = path.Join(t.endpoint(), cr.from.String())
+
 	req, err := http.NewRequest("GET", uu.String(), nil)
 	req, err := http.NewRequest("GET", uu.String(), nil)
 	if err != nil {
 	if err != nil {
 		cr.picker.unreachable(u)
 		cr.picker.unreachable(u)
-		return nil, fmt.Errorf("new request to %s error: %v", u, err)
+		return nil, fmt.Errorf("failed to make http request to %s (%v)", u, err)
 	}
 	}
 	req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String())
 	req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String())
 	req.Header.Set("X-Raft-To", cr.to.String())
 	req.Header.Set("X-Raft-To", cr.to.String())
 	if t == streamTypeMsgApp {
 	if t == streamTypeMsgApp {
 		req.Header.Set("X-Raft-Term", strconv.FormatUint(term, 10))
 		req.Header.Set("X-Raft-Term", strconv.FormatUint(term, 10))
 	}
 	}
+
 	cr.mu.Lock()
 	cr.mu.Lock()
 	cr.req = req
 	cr.req = req
 	cr.mu.Unlock()
 	cr.mu.Unlock()
+
 	resp, err := cr.tr.RoundTrip(req)
 	resp, err := cr.tr.RoundTrip(req)
 	if err != nil {
 	if err != nil {
 		cr.picker.unreachable(u)
 		cr.picker.unreachable(u)
-		return nil, fmt.Errorf("error roundtripping to %s: %v", req.URL, err)
+		return nil, err
 	}
 	}
 
 
 	rv := serverVersion(resp.Header)
 	rv := serverVersion(resp.Header)
@@ -423,7 +426,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 		return resp.Body, nil
 		return resp.Body, nil
 	case http.StatusNotFound:
 	case http.StatusNotFound:
 		resp.Body.Close()
 		resp.Body.Close()
-		return nil, fmt.Errorf("local member has not been added to the peer list of member %s", cr.to)
+		return nil, fmt.Errorf("remote member %s could not recognize local member", cr.to)
 	case http.StatusPreconditionFailed:
 	case http.StatusPreconditionFailed:
 		resp.Body.Close()
 		resp.Body.Close()
 		log.Printf("rafthttp: request sent was ignored due to cluster ID mismatch (remote[%s]:%s, local:%s)",
 		log.Printf("rafthttp: request sent was ignored due to cluster ID mismatch (remote[%s]:%s, local:%s)",

+ 1 - 1
rafthttp/transport.go

@@ -150,7 +150,7 @@ func (t *transport) Send(msgs []raftpb.Message) {
 			continue
 			continue
 		}
 		}
 
 
-		log.Printf("etcdserver: send message to unknown receiver %s", to)
+		log.Printf("rafthttp: ignored message %s (sent to unknown receiver %s)", m.Type, to)
 	}
 	}
 }
 }