Browse Source

rafthttp: add "ActivePeers" to "Transport"

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 7 years ago
parent
commit
6fe7316ec4
2 changed files with 18 additions and 0 deletions
  1. 1 0
      etcdserver/util_test.go
  2. 17 0
      rafthttp/transport.go

+ 1 - 0
etcdserver/util_test.go

@@ -83,6 +83,7 @@ func (s *nopTransporterWithActiveTime) RemovePeer(id types.ID)              {}
 func (s *nopTransporterWithActiveTime) RemoveAllPeers()                     {}
 func (s *nopTransporterWithActiveTime) UpdatePeer(id types.ID, us []string) {}
 func (s *nopTransporterWithActiveTime) ActiveSince(id types.ID) time.Time   { return s.activeMap[id] }
+func (s *nopTransporterWithActiveTime) ActivePeers() int                    { return 0 }
 func (s *nopTransporterWithActiveTime) Stop()                               {}
 func (s *nopTransporterWithActiveTime) Pause()                              {}
 func (s *nopTransporterWithActiveTime) Resume()                             {}

+ 17 - 0
rafthttp/transport.go

@@ -85,6 +85,8 @@ type Transporter interface {
 	// If the connection is active since peer was added, it returns the adding time.
 	// If the connection is currently inactive, it returns zero time.
 	ActiveSince(id types.ID) time.Time
+	// ActivePeers returns the number of active peers.
+	ActivePeers() int
 	// Stop closes the connections and stops the transporter.
 	Stop()
 }
@@ -375,6 +377,20 @@ func (t *Transport) Resume() {
 	}
 }
 
+// ActivePeers returns a channel that closes when an initial
+// peer connection has been established. Use this to wait until the
+// first peer connection becomes active.
+func (t *Transport) ActivePeers() (cnt int) {
+	t.mu.RLock()
+	defer t.mu.RUnlock()
+	for _, p := range t.peers {
+		if !p.activeSince().IsZero() {
+			cnt++
+		}
+	}
+	return cnt
+}
+
 type nopTransporter struct{}
 
 func NewNopTransporter() Transporter {
@@ -391,6 +407,7 @@ func (s *nopTransporter) RemovePeer(id types.ID)              {}
 func (s *nopTransporter) RemoveAllPeers()                     {}
 func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
 func (s *nopTransporter) ActiveSince(id types.ID) time.Time   { return time.Time{} }
+func (s *nopTransporter) ActivePeers() int                    { return 0 }
 func (s *nopTransporter) Stop()                               {}
 func (s *nopTransporter) Pause()                              {}
 func (s *nopTransporter) Resume()                             {}