Browse Source

etcdserver: fix leader stats

Xiang Li 11 years ago
parent
commit
04522baeee
4 changed files with 23 additions and 12 deletions
  1. 6 1
      etcdserver/etcdhttp/client.go
  2. 4 1
      etcdserver/server.go
  3. 11 8
      rafthttp/sender.go
  4. 2 2
      rafthttp/sender_test.go

+ 6 - 1
etcdserver/etcdhttp/client.go

@@ -260,8 +260,13 @@ func (h *statsHandler) serveLeader(w http.ResponseWriter, r *http.Request) {
 	if !allowMethod(w, r.Method, "GET") {
 		return
 	}
+	stats := h.stats.LeaderStats()
+	if stats == nil {
+		writeError(w, httptypes.NewHTTPError(http.StatusForbidden, "not current leader"))
+		return
+	}
 	w.Header().Set("Content-Type", "application/json")
-	w.Write(h.stats.LeaderStats())
+	w.Write(stats)
 }
 
 func serveVersion(w http.ResponseWriter, r *http.Request) {

+ 4 - 1
etcdserver/server.go

@@ -526,7 +526,10 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
 func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
 
 func (s *EtcdServer) LeaderStats() []byte {
-	// TODO(jonboulle): need to lock access to lstats, set it to nil when not leader, ...
+	lead := atomic.LoadUint64(&s.raftLead)
+	if lead != uint64(s.id) {
+		return nil
+	}
 	return s.lstats.JSON()
 }
 

+ 11 - 8
rafthttp/sender.go

@@ -77,7 +77,7 @@ func NewSender(tr http.RoundTripper, u string, id types.ID, cid types.ID, p Proc
 		shouldstop:  shouldstop,
 		batcher:     NewBatcher(100, appRespBatchMs*time.Millisecond),
 		propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond),
-		q:           make(chan []byte, senderBufSize),
+		q:           make(chan *raftpb.Message, senderBufSize),
 	}
 	s.wg.Add(connPerSender)
 	for i := 0; i < connPerSender; i++ {
@@ -98,7 +98,7 @@ type sender struct {
 	strmCln     *streamClient
 	batcher     *Batcher
 	propBatcher *ProposalBatcher
-	q           chan []byte
+	q           chan *raftpb.Message
 
 	strmSrvMu sync.Mutex
 	strmSrv   *streamServer
@@ -184,9 +184,8 @@ func (s *sender) Send(m raftpb.Message) error {
 func (s *sender) send(m raftpb.Message) error {
 	// TODO: don't block. we should be able to have 1000s
 	// of messages out at a time.
-	data := pbutil.MustMarshal(&m)
 	select {
-	case s.q <- data:
+	case s.q <- &m:
 		return nil
 	default:
 		log.Printf("sender: dropping %s because maximal number %d of sender buffer entries to %s has been reached",
@@ -267,9 +266,9 @@ func (s *sender) tryStream(m raftpb.Message) bool {
 
 func (s *sender) handle() {
 	defer s.wg.Done()
-	for d := range s.q {
+	for m := range s.q {
 		start := time.Now()
-		err := s.post(d)
+		err := s.post(pbutil.MustMarshal(m))
 		end := time.Now()
 
 		s.mu.Lock()
@@ -282,14 +281,18 @@ func (s *sender) handle() {
 				log.Printf("sender: the connection with %s becomes inactive", s.id)
 				s.active = false
 			}
-			s.fs.Fail()
+			if m.Type == raftpb.MsgApp {
+				s.fs.Fail()
+			}
 		} else {
 			if !s.active {
 				log.Printf("sender: the connection with %s becomes active", s.id)
 				s.active = true
 				s.errored = nil
 			}
-			s.fs.Succ(end.Sub(start))
+			if m.Type == raftpb.MsgApp {
+				s.fs.Succ(end.Sub(start))
+			}
 		}
 		s.mu.Unlock()
 	}

+ 2 - 2
rafthttp/sender_test.go

@@ -36,7 +36,7 @@ func TestSenderSend(t *testing.T) {
 	fs := &stats.FollowerStats{}
 	s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
 
-	if err := s.Send(raftpb.Message{}); err != nil {
+	if err := s.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
 		t.Fatalf("unexpect send error: %v", err)
 	}
 	s.Stop()
@@ -88,7 +88,7 @@ func TestSenderSendFailed(t *testing.T) {
 	fs := &stats.FollowerStats{}
 	s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
 
-	if err := s.Send(raftpb.Message{}); err != nil {
+	if err := s.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
 		t.Fatalf("unexpect Send error: %v", err)
 	}
 	s.Stop()