Browse Source

Merge pull request #1939 from xiang90/sender_logging

rafthttp: better logging
Xiang Li 11 years ago
parent
commit
d07434f99e
3 changed files with 46 additions and 21 deletions
  1. 1 1
      etcdserver/sendhub.go
  2. 39 14
      rafthttp/sender.go
  3. 6 6
      rafthttp/sender_test.go

+ 1 - 1
etcdserver/sendhub.go

@@ -113,7 +113,7 @@ func (h *sendHub) Add(m *Member) {
 	}
 	u.Path = path.Join(u.Path, raftPrefix)
 	fs := h.ls.Follower(m.ID.String())
-	s := rafthttp.NewSender(h.tr, u.String(), h.cl.ID(), h.p, fs, h.shouldstop)
+	s := rafthttp.NewSender(h.tr, u.String(), m.ID, h.cl.ID(), h.p, fs, h.shouldstop)
 	h.senders[m.ID] = s
 }
 

+ 39 - 14
rafthttp/sender.go

@@ -65,8 +65,10 @@ type Sender interface {
 	Resume()
 }
 
-func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
+func NewSender(tr http.RoundTripper, u string, id types.ID, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
 	s := &sender{
+		id:          id,
+		active:      true,
 		tr:          tr,
 		u:           u,
 		cid:         cid,
@@ -85,9 +87,10 @@ func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *st
 }
 
 type sender struct {
+	id  types.ID
+	cid types.ID
+
 	tr         http.RoundTripper
-	u          string
-	cid        types.ID
 	p          Processor
 	fs         *stats.FollowerStats
 	shouldstop chan struct{}
@@ -99,9 +102,16 @@ type sender struct {
 	strmSrvMu   sync.Mutex
 	q           chan []byte
 
-	paused bool
-	mu     sync.RWMutex
-	wg     sync.WaitGroup
+	// wait for the handling routines
+	wg sync.WaitGroup
+
+	mu sync.RWMutex
+	u  string // the url this sender post to
+	// if the last send was successful, thi sender is active.
+	// Or it is inactive
+	active  bool
+	errored error
+	paused  bool
 }
 
 func (s *sender) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-chan struct{}, error) {
@@ -258,12 +268,27 @@ func (s *sender) handle() {
 		start := time.Now()
 		err := s.post(d)
 		end := time.Now()
+
+		s.mu.Lock()
 		if err != nil {
+			if s.errored == nil || s.errored.Error() != err.Error() {
+				log.Printf("sender: error posting to %s: %v", s.id, err)
+				s.errored = err
+			}
+			if s.active {
+				log.Printf("sender: the connection with %s becomes inactive", s.id)
+				s.active = false
+			}
 			s.fs.Fail()
-			log.Printf("sender: %v", err)
-			continue
+		} else {
+			if !s.active {
+				log.Printf("sender: the connection with %s becomes active", s.id)
+				s.active = true
+				s.errored = nil
+			}
+			s.fs.Succ(end.Sub(start))
 		}
-		s.fs.Succ(end.Sub(start))
+		s.mu.Unlock()
 	}
 }
 
@@ -274,13 +299,13 @@ func (s *sender) post(data []byte) error {
 	req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data))
 	s.mu.RUnlock()
 	if err != nil {
-		return fmt.Errorf("new request to %s error: %v", s.u, err)
+		return err
 	}
 	req.Header.Set("Content-Type", "application/protobuf")
 	req.Header.Set("X-Etcd-Cluster-ID", s.cid.String())
 	resp, err := s.tr.RoundTrip(req)
 	if err != nil {
-		return fmt.Errorf("error posting to %q: %v", req.URL.String(), err)
+		return err
 	}
 	resp.Body.Close()
 
@@ -290,15 +315,15 @@ func (s *sender) post(data []byte) error {
 		case s.shouldstop <- struct{}{}:
 		default:
 		}
-		log.Printf("etcdserver: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
+		log.Printf("rafthttp: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
 		return nil
 	case http.StatusForbidden:
 		select {
 		case s.shouldstop <- struct{}{}:
 		default:
 		}
-		log.Println("etcdserver: this member has been permanently removed from the cluster")
-		log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
+		log.Println("rafthttp: this member has been permanently removed from the cluster")
+		log.Println("rafthttp: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
 		return nil
 	case http.StatusNoContent:
 		return nil

+ 6 - 6
rafthttp/sender_test.go

@@ -34,7 +34,7 @@ import (
 func TestSenderSend(t *testing.T) {
 	tr := &roundTripperRecorder{}
 	fs := &stats.FollowerStats{}
-	s := NewSender(tr, "http://10.0.0.1", types.ID(1), &nopProcessor{}, fs, nil)
+	s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
 
 	if err := s.Send(raftpb.Message{}); err != nil {
 		t.Fatalf("unexpect send error: %v", err)
@@ -54,7 +54,7 @@ func TestSenderSend(t *testing.T) {
 func TestSenderExceedMaximalServing(t *testing.T) {
 	tr := newRoundTripperBlocker()
 	fs := &stats.FollowerStats{}
-	s := NewSender(tr, "http://10.0.0.1", types.ID(1), &nopProcessor{}, fs, nil)
+	s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
 
 	// keep the sender busy and make the buffer full
 	// nothing can go out as we block the sender
@@ -86,7 +86,7 @@ func TestSenderExceedMaximalServing(t *testing.T) {
 // it increases fail count in stats.
 func TestSenderSendFailed(t *testing.T) {
 	fs := &stats.FollowerStats{}
-	s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), &nopProcessor{}, fs, nil)
+	s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
 
 	if err := s.Send(raftpb.Message{}); err != nil {
 		t.Fatalf("unexpect Send error: %v", err)
@@ -102,7 +102,7 @@ func TestSenderSendFailed(t *testing.T) {
 
 func TestSenderPost(t *testing.T) {
 	tr := &roundTripperRecorder{}
-	s := NewSender(tr, "http://10.0.0.1", types.ID(1), &nopProcessor{}, nil, nil)
+	s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, nil, nil)
 	if err := s.post([]byte("some data")); err != nil {
 		t.Fatalf("unexpect post error: %v", err)
 	}
@@ -145,7 +145,7 @@ func TestSenderPostBad(t *testing.T) {
 	}
 	for i, tt := range tests {
 		shouldstop := make(chan struct{})
-		s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), &nopProcessor{}, nil, shouldstop)
+		s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop)
 		err := s.post([]byte("some data"))
 		s.Stop()
 
@@ -166,7 +166,7 @@ func TestSenderPostShouldStop(t *testing.T) {
 	}
 	for i, tt := range tests {
 		shouldstop := make(chan struct{}, 1)
-		s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), &nopProcessor{}, nil, shouldstop)
+		s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop)
 		s.post([]byte("some data"))
 		s.Stop()
 		select {