Jelajahi Sumber

Merge pull request #5515 from xiang90/logging

*: more logging on critical state change
Xiang Li 9 tahun lalu
induk
melakukan
36fcc9e9d4

+ 6 - 0
etcdserver/membership/cluster.go

@@ -290,6 +290,8 @@ func (c *RaftCluster) AddMember(m *Member) {
 	}
 	}
 
 
 	c.members[m.ID] = m
 	c.members[m.ID] = m
+
+	plog.Infof("added member %s %v to cluster %s", m.ID, m.PeerURLs, c.id)
 }
 }
 
 
 // RemoveMember removes a member from the store.
 // RemoveMember removes a member from the store.
@@ -306,6 +308,8 @@ func (c *RaftCluster) RemoveMember(id types.ID) {
 
 
 	delete(c.members, id)
 	delete(c.members, id)
 	c.removed[id] = true
 	c.removed[id] = true
+
+	plog.Infof("removed member %s from cluster %s", id, c.id)
 }
 }
 
 
 func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) {
 func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) {
@@ -339,6 +343,8 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes)
 	if c.be != nil {
 	if c.be != nil {
 		mustSaveMemberToBackend(c.be, c.members[id])
 		mustSaveMemberToBackend(c.be, c.members[id])
 	}
 	}
+
+	plog.Noticef("updated member %s %v in cluster %s", id, raftAttr.PeerURLs, c.id)
 }
 }
 
 
 func (c *RaftCluster) Version() *semver.Version {
 func (c *RaftCluster) Version() *semver.Version {

+ 30 - 12
etcdserver/server.go

@@ -614,6 +614,9 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 		return
 		return
 	}
 	}
 
 
+	plog.Infof("applying snapshot at index %d...", ep.snapi)
+	defer plog.Infof("finished applying incoming snapshot at index %d", ep.snapi)
+
 	if apply.snapshot.Metadata.Index <= ep.appliedi {
 	if apply.snapshot.Metadata.Index <= ep.appliedi {
 		plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1",
 		plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1",
 			apply.snapshot.Metadata.Index, ep.appliedi)
 			apply.snapshot.Metadata.Index, ep.appliedi)
@@ -630,17 +633,25 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 	}
 	}
 
 
 	newbe := backend.NewDefaultBackend(fn)
 	newbe := backend.NewDefaultBackend(fn)
+
+	plog.Info("restoring mvcc store...")
+
 	if err := s.kv.Restore(newbe); err != nil {
 	if err := s.kv.Restore(newbe); err != nil {
 		plog.Panicf("restore KV error: %v", err)
 		plog.Panicf("restore KV error: %v", err)
 	}
 	}
 	s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex())
 	s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex())
 
 
+	plog.Info("finished restoring mvcc store")
+
 	// Closing old backend might block until all the txns
 	// Closing old backend might block until all the txns
 	// on the backend are finished.
 	// on the backend are finished.
 	// We do not want to wait on closing the old backend.
 	// We do not want to wait on closing the old backend.
 	s.bemu.Lock()
 	s.bemu.Lock()
 	oldbe := s.be
 	oldbe := s.be
 	go func() {
 	go func() {
+		plog.Info("closing old backend...")
+		defer plog.Info("finished closing old backend")
+
 		if err := oldbe.Close(); err != nil {
 		if err := oldbe.Close(); err != nil {
 			plog.Panicf("close backend error: %v", err)
 			plog.Panicf("close backend error: %v", err)
 		}
 		}
@@ -650,36 +661,51 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 	s.bemu.Unlock()
 	s.bemu.Unlock()
 
 
 	if s.lessor != nil {
 	if s.lessor != nil {
+		plog.Info("recovering lessor...")
 		s.lessor.Recover(newbe, s.kv)
 		s.lessor.Recover(newbe, s.kv)
+		plog.Info("finished recovering lessor")
 	}
 	}
 
 
+	plog.Info("recovering alarms...")
 	if err := s.restoreAlarms(); err != nil {
 	if err := s.restoreAlarms(); err != nil {
 		plog.Panicf("restore alarms error: %v", err)
 		plog.Panicf("restore alarms error: %v", err)
 	}
 	}
+	plog.Info("finished recovering alarms")
 
 
 	if s.authStore != nil {
 	if s.authStore != nil {
+		plog.Info("recovering auth store...")
 		s.authStore.Recover(newbe)
 		s.authStore.Recover(newbe)
+		plog.Info("finished recovering auth store")
 	}
 	}
 
 
+	plog.Info("recovering store v2...")
 	if err := s.store.Recovery(apply.snapshot.Data); err != nil {
 	if err := s.store.Recovery(apply.snapshot.Data); err != nil {
 		plog.Panicf("recovery store error: %v", err)
 		plog.Panicf("recovery store error: %v", err)
 	}
 	}
+	plog.Info("finished recovering store v2")
+
 	s.cluster.SetBackend(s.be)
 	s.cluster.SetBackend(s.be)
+	plog.Info("recovering cluster configuration...")
 	s.cluster.Recover()
 	s.cluster.Recover()
+	plog.Info("finished recovering cluster configuration")
 
 
+	plog.Info("removing old peers from network...")
 	// recover raft transport
 	// recover raft transport
 	s.r.transport.RemoveAllPeers()
 	s.r.transport.RemoveAllPeers()
+	plog.Info("finished removing old peers from network")
+
+	plog.Info("adding peers from new cluster configuration into network...")
 	for _, m := range s.cluster.Members() {
 	for _, m := range s.cluster.Members() {
 		if m.ID == s.ID() {
 		if m.ID == s.ID() {
 			continue
 			continue
 		}
 		}
 		s.r.transport.AddPeer(m.ID, m.PeerURLs)
 		s.r.transport.AddPeer(m.ID, m.PeerURLs)
 	}
 	}
+	plog.Info("finished adding peers from new cluster configuration into network...")
 
 
 	ep.appliedi = apply.snapshot.Metadata.Index
 	ep.appliedi = apply.snapshot.Metadata.Index
 	ep.snapi = ep.appliedi
 	ep.snapi = ep.appliedi
 	ep.confState = apply.snapshot.Metadata.ConfState
 	ep.confState = apply.snapshot.Metadata.ConfState
-	plog.Infof("recovered from incoming snapshot at index %d", ep.snapi)
 }
 }
 
 
 func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
 func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
@@ -1075,21 +1101,16 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
 			plog.Panicf("nodeID should always be equal to member ID")
 			plog.Panicf("nodeID should always be equal to member ID")
 		}
 		}
 		s.cluster.AddMember(m)
 		s.cluster.AddMember(m)
-		if m.ID == s.id {
-			plog.Noticef("added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
-		} else {
+		if m.ID != s.id {
 			s.r.transport.AddPeer(m.ID, m.PeerURLs)
 			s.r.transport.AddPeer(m.ID, m.PeerURLs)
-			plog.Noticef("added member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
 		}
 		}
 	case raftpb.ConfChangeRemoveNode:
 	case raftpb.ConfChangeRemoveNode:
 		id := types.ID(cc.NodeID)
 		id := types.ID(cc.NodeID)
 		s.cluster.RemoveMember(id)
 		s.cluster.RemoveMember(id)
 		if id == s.id {
 		if id == s.id {
 			return true, nil
 			return true, nil
-		} else {
-			s.r.transport.RemovePeer(id)
-			plog.Noticef("removed member %s from cluster %s", id, s.cluster.ID())
 		}
 		}
+		s.r.transport.RemovePeer(id)
 	case raftpb.ConfChangeUpdateNode:
 	case raftpb.ConfChangeUpdateNode:
 		m := new(membership.Member)
 		m := new(membership.Member)
 		if err := json.Unmarshal(cc.Context, m); err != nil {
 		if err := json.Unmarshal(cc.Context, m); err != nil {
@@ -1099,11 +1120,8 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
 			plog.Panicf("nodeID should always be equal to member ID")
 			plog.Panicf("nodeID should always be equal to member ID")
 		}
 		}
 		s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
 		s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
-		if m.ID == s.id {
-			plog.Noticef("update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
-		} else {
+		if m.ID != s.id {
 			s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
 			s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
-			plog.Noticef("update member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
 		}
 		}
 	}
 	}
 	return false, nil
 	return false, nil

+ 6 - 0
rafthttp/peer.go

@@ -118,6 +118,9 @@ type peer struct {
 }
 }
 
 
 func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
 func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
+	plog.Infof("starting peer %s...", to)
+	defer plog.Infof("started peer %s", to)
+
 	status := newPeerStatus(to)
 	status := newPeerStatus(to)
 	picker := newURLPicker(urls)
 	picker := newURLPicker(urls)
 	pipeline := &pipeline{
 	pipeline := &pipeline{
@@ -270,6 +273,9 @@ func (p *peer) Resume() {
 }
 }
 
 
 func (p *peer) stop() {
 func (p *peer) stop() {
+	plog.Infof("stopping peer %s...", p.id)
+	defer plog.Infof("stopped peer %s", p.id)
+
 	close(p.stopc)
 	close(p.stopc)
 	p.cancel()
 	p.cancel()
 	p.msgAppV2Writer.stop()
 	p.msgAppV2Writer.stop()

+ 2 - 2
rafthttp/peer_status.go

@@ -44,7 +44,7 @@ func (s *peerStatus) activate() {
 	s.mu.Lock()
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	defer s.mu.Unlock()
 	if !s.active {
 	if !s.active {
-		plog.Infof("the connection with %s became active", s.id)
+		plog.Infof("peer %s became active", s.id)
 		s.active = true
 		s.active = true
 		s.since = time.Now()
 		s.since = time.Now()
 	}
 	}
@@ -56,7 +56,7 @@ func (s *peerStatus) deactivate(failure failureType, reason string) {
 	msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
 	msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
 	if s.active {
 	if s.active {
 		plog.Errorf(msg)
 		plog.Errorf(msg)
-		plog.Infof("the connection with %s became inactive", s.id)
+		plog.Infof("peer %s became inactive", s.id)
 		s.active = false
 		s.active = false
 		s.since = time.Time{}
 		s.since = time.Time{}
 		return
 		return

+ 2 - 0
rafthttp/pipeline.go

@@ -64,11 +64,13 @@ func (p *pipeline) start() {
 	for i := 0; i < connPerPipeline; i++ {
 	for i := 0; i < connPerPipeline; i++ {
 		go p.handle()
 		go p.handle()
 	}
 	}
+	plog.Infof("started HTTP pipelining with peer %s", p.to)
 }
 }
 
 
 func (p *pipeline) stop() {
 func (p *pipeline) stop() {
 	close(p.stopc)
 	close(p.stopc)
 	p.wg.Wait()
 	p.wg.Wait()
+	plog.Infof("stopped HTTP pipelining with peer %s", p.to)
 }
 }
 
 
 func (p *pipeline) handle() {
 func (p *pipeline) handle() {

+ 1 - 1
rafthttp/probing_status.go

@@ -48,7 +48,7 @@ func monitorProbingStatus(s probing.Status, id string) {
 		select {
 		select {
 		case <-time.After(statusMonitoringInterval):
 		case <-time.After(statusMonitoringInterval):
 			if !s.Health() {
 			if !s.Health() {
-				plog.Warningf("the connection to peer %s is unhealthy", id)
+				plog.Warningf("health check for peer %s failed", id)
 			}
 			}
 			if s.ClockDiff() > time.Second {
 			if s.ClockDiff() > time.Second {
 				plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
 				plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)

+ 21 - 5
rafthttp/stream.go

@@ -141,6 +141,8 @@ func (cw *streamWriter) run() {
 	tickc := time.Tick(ConnReadTimeout / 3)
 	tickc := time.Tick(ConnReadTimeout / 3)
 	unflushed := 0
 	unflushed := 0
 
 
+	plog.Infof("started streaming with peer %s (writer)", cw.id)
+
 	for {
 	for {
 		select {
 		select {
 		case <-heartbeatc:
 		case <-heartbeatc:
@@ -155,7 +157,9 @@ func (cw *streamWriter) run() {
 			}
 			}
 
 
 			cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
 			cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
+
 			cw.close()
 			cw.close()
+			plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.id, t)
 			heartbeatc, msgc = nil, nil
 			heartbeatc, msgc = nil, nil
 
 
 		case m := <-msgc:
 		case m := <-msgc:
@@ -177,11 +181,14 @@ func (cw *streamWriter) run() {
 
 
 			cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
 			cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
 			cw.close()
 			cw.close()
+			plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.id, t)
 			heartbeatc, msgc = nil, nil
 			heartbeatc, msgc = nil, nil
 			cw.r.ReportUnreachable(m.To)
 			cw.r.ReportUnreachable(m.To)
 
 
 		case conn := <-cw.connc:
 		case conn := <-cw.connc:
-			cw.close()
+			if cw.close() {
+				plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.id, t)
+			}
 			t = conn.t
 			t = conn.t
 			switch conn.t {
 			switch conn.t {
 			case streamTypeMsgAppV2:
 			case streamTypeMsgAppV2:
@@ -198,10 +205,14 @@ func (cw *streamWriter) run() {
 			cw.closer = conn.Closer
 			cw.closer = conn.Closer
 			cw.working = true
 			cw.working = true
 			cw.mu.Unlock()
 			cw.mu.Unlock()
+			plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.id, t)
 			heartbeatc, msgc = tickc, cw.msgc
 			heartbeatc, msgc = tickc, cw.msgc
 		case <-cw.stopc:
 		case <-cw.stopc:
-			cw.close()
+			if cw.close() {
+				plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.id, t)
+			}
 			close(cw.done)
 			close(cw.done)
+			plog.Infof("stopped streaming with peer %s (writer)", cw.id)
 			return
 			return
 		}
 		}
 	}
 	}
@@ -213,11 +224,11 @@ func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
 	return cw.msgc, cw.working
 	return cw.msgc, cw.working
 }
 }
 
 
-func (cw *streamWriter) close() {
+func (cw *streamWriter) close() bool {
 	cw.mu.Lock()
 	cw.mu.Lock()
 	defer cw.mu.Unlock()
 	defer cw.mu.Unlock()
 	if !cw.working {
 	if !cw.working {
-		return
+		return false
 	}
 	}
 	cw.closer.Close()
 	cw.closer.Close()
 	if len(cw.msgc) > 0 {
 	if len(cw.msgc) > 0 {
@@ -225,6 +236,7 @@ func (cw *streamWriter) close() {
 	}
 	}
 	cw.msgc = make(chan raftpb.Message, streamBufSize)
 	cw.msgc = make(chan raftpb.Message, streamBufSize)
 	cw.working = false
 	cw.working = false
+	return true
 }
 }
 
 
 func (cw *streamWriter) attach(conn *outgoingConn) bool {
 func (cw *streamWriter) attach(conn *outgoingConn) bool {
@@ -275,8 +287,9 @@ func (r *streamReader) start() {
 }
 }
 
 
 func (cr *streamReader) run() {
 func (cr *streamReader) run() {
+	t := cr.typ
+	plog.Infof("started streaming with peer %s (%s reader)", cr.to, t)
 	for {
 	for {
-		t := cr.typ
 		rc, err := cr.dial(t)
 		rc, err := cr.dial(t)
 		if err != nil {
 		if err != nil {
 			if err != errUnsupportedStreamType {
 			if err != errUnsupportedStreamType {
@@ -284,7 +297,9 @@ func (cr *streamReader) run() {
 			}
 			}
 		} else {
 		} else {
 			cr.status.activate()
 			cr.status.activate()
+			plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.to, cr.typ)
 			err := cr.decodeLoop(rc, t)
 			err := cr.decodeLoop(rc, t)
+			plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.to, cr.typ)
 			switch {
 			switch {
 			// all data is read out
 			// all data is read out
 			case err == io.EOF:
 			case err == io.EOF:
@@ -300,6 +315,7 @@ func (cr *streamReader) run() {
 		case <-time.After(100 * time.Millisecond):
 		case <-time.After(100 * time.Millisecond):
 		case <-cr.stopc:
 		case <-cr.stopc:
 			close(cr.done)
 			close(cr.done)
+			plog.Infof("stopped streaming with peer %s (%s reader)", cr.to, t)
 			return
 			return
 		}
 		}
 	}
 	}

+ 5 - 0
rafthttp/transport.go

@@ -231,6 +231,7 @@ func (t *Transport) AddRemote(id types.ID, us []string) {
 func (t *Transport) AddPeer(id types.ID, us []string) {
 func (t *Transport) AddPeer(id types.ID, us []string) {
 	t.mu.Lock()
 	t.mu.Lock()
 	defer t.mu.Unlock()
 	defer t.mu.Unlock()
+
 	if t.peers == nil {
 	if t.peers == nil {
 		panic("transport stopped")
 		panic("transport stopped")
 	}
 	}
@@ -244,6 +245,8 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
 	fs := t.LeaderStats.Follower(id.String())
 	fs := t.LeaderStats.Follower(id.String())
 	t.peers[id] = startPeer(t, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC)
 	t.peers[id] = startPeer(t, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC)
 	addPeerToProber(t.prober, id.String(), us)
 	addPeerToProber(t.prober, id.String(), us)
+
+	plog.Infof("added peer %s", id)
 }
 }
 
 
 func (t *Transport) RemovePeer(id types.ID) {
 func (t *Transport) RemovePeer(id types.ID) {
@@ -270,6 +273,7 @@ func (t *Transport) removePeer(id types.ID) {
 	delete(t.peers, id)
 	delete(t.peers, id)
 	delete(t.LeaderStats.Followers, id.String())
 	delete(t.LeaderStats.Followers, id.String())
 	t.prober.Remove(id.String())
 	t.prober.Remove(id.String())
+	plog.Infof("removed peer %s", id)
 }
 }
 
 
 func (t *Transport) UpdatePeer(id types.ID, us []string) {
 func (t *Transport) UpdatePeer(id types.ID, us []string) {
@@ -287,6 +291,7 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
 
 
 	t.prober.Remove(id.String())
 	t.prober.Remove(id.String())
 	addPeerToProber(t.prober, id.String(), us)
 	addPeerToProber(t.prober, id.String(), us)
+	plog.Infof("updated peer %s", id)
 }
 }
 
 
 func (t *Transport) ActiveSince(id types.ID) time.Time {
 func (t *Transport) ActiveSince(id types.ID) time.Time {