Browse Source

etcdserver: support apply wait

Xiang Li 9 years ago
parent
commit
83de13e4a8
4 changed files with 37 additions and 28 deletions
  1. 11 6
      etcdserver/server.go
  2. 5 17
      etcdserver/v3_server.go
  3. 11 2
      pkg/wait/wait_time.go
  4. 10 3
      pkg/wait/wait_time_test.go

+ 11 - 6
etcdserver/server.go

@@ -190,12 +190,14 @@ type EtcdServer struct {
 	applyV3 applierV3
 	applyV3 applierV3
 	// applyV3Base is the core applier without auth or quotas
 	// applyV3Base is the core applier without auth or quotas
 	applyV3Base applierV3
 	applyV3Base applierV3
-	kv          mvcc.ConsistentWatchableKV
-	lessor      lease.Lessor
-	bemu        sync.Mutex
-	be          backend.Backend
-	authStore   auth.AuthStore
-	alarmStore  *alarm.AlarmStore
+	applyWait   wait.WaitTime
+
+	kv         mvcc.ConsistentWatchableKV
+	lessor     lease.Lessor
+	bemu       sync.Mutex
+	be         backend.Backend
+	authStore  auth.AuthStore
+	alarmStore *alarm.AlarmStore
 
 
 	stats  *stats.ServerStats
 	stats  *stats.ServerStats
 	lstats *stats.LeaderStats
 	lstats *stats.LeaderStats
@@ -475,6 +477,7 @@ func (s *EtcdServer) start() {
 		s.snapCount = DefaultSnapCount
 		s.snapCount = DefaultSnapCount
 	}
 	}
 	s.w = wait.New()
 	s.w = wait.New()
+	s.applyWait = wait.NewTimeList()
 	s.done = make(chan struct{})
 	s.done = make(chan struct{})
 	s.stop = make(chan struct{})
 	s.stop = make(chan struct{})
 	if s.ClusterVersion() != nil {
 	if s.ClusterVersion() != nil {
@@ -629,10 +632,12 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
 		plog.Warningf("avoid queries with large range/delete range!")
 		plog.Warningf("avoid queries with large range/delete range!")
 	}
 	}
 	proposalsApplied.Set(float64(ep.appliedi))
 	proposalsApplied.Set(float64(ep.appliedi))
+	s.applyWait.Trigger(ep.appliedi)
 	// wait for the raft routine to finish the disk writes before triggering a
 	// wait for the raft routine to finish the disk writes before triggering a
 	// snapshot. or applied index might be greater than the last index in raft
 	// snapshot. or applied index might be greater than the last index in raft
 	// storage, since the raft routine might be slower than apply routine.
 	// storage, since the raft routine might be slower than apply routine.
 	<-apply.raftDone
 	<-apply.raftDone
+
 	s.triggerSnapshot(ep)
 	s.triggerSnapshot(ep)
 	select {
 	select {
 	// snapshot requested via send()
 	// snapshot requested via send()

+ 5 - 17
etcdserver/v3_server.go

@@ -463,24 +463,12 @@ func (s *EtcdServer) isValidSimpleToken(token string) bool {
 		return false
 		return false
 	}
 	}
 
 
-	// CAUTION: below index synchronization is required because this node
-	// might not receive and apply the log entry of Authenticate() RPC.
-	authApplied := false
-	for i := 0; i < 10; i++ {
-		if uint64(index) <= s.getAppliedIndex() {
-			authApplied = true
-			break
-		}
-
-		time.Sleep(100 * time.Millisecond)
-	}
-
-	if !authApplied {
-		plog.Errorf("timeout of waiting Authenticate() RPC")
-		return false
+	select {
+	case <-s.applyWait.Wait(uint64(index)):
+		return true
+	case <-s.stop:
+		return true
 	}
 	}
-
-	return true
 }
 }
 
 
 func (s *EtcdServer) authInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) {
 func (s *EtcdServer) authInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) {

+ 11 - 2
pkg/wait/wait_time.go

@@ -25,9 +25,14 @@ type WaitTime interface {
 	Trigger(deadline uint64)
 	Trigger(deadline uint64)
 }
 }
 
 
+var closec chan struct{}
+
+func init() { closec = make(chan struct{}); close(closec) }
+
 type timeList struct {
 type timeList struct {
-	l sync.Mutex
-	m map[uint64]chan struct{}
+	l                   sync.Mutex
+	lastTriggerDeadline uint64
+	m                   map[uint64]chan struct{}
 }
 }
 
 
 func NewTimeList() *timeList {
 func NewTimeList() *timeList {
@@ -37,6 +42,9 @@ func NewTimeList() *timeList {
 func (tl *timeList) Wait(deadline uint64) <-chan struct{} {
 func (tl *timeList) Wait(deadline uint64) <-chan struct{} {
 	tl.l.Lock()
 	tl.l.Lock()
 	defer tl.l.Unlock()
 	defer tl.l.Unlock()
+	if tl.lastTriggerDeadline >= deadline {
+		return closec
+	}
 	ch := tl.m[deadline]
 	ch := tl.m[deadline]
 	if ch == nil {
 	if ch == nil {
 		ch = make(chan struct{})
 		ch = make(chan struct{})
@@ -48,6 +56,7 @@ func (tl *timeList) Wait(deadline uint64) <-chan struct{} {
 func (tl *timeList) Trigger(deadline uint64) {
 func (tl *timeList) Trigger(deadline uint64) {
 	tl.l.Lock()
 	tl.l.Lock()
 	defer tl.l.Unlock()
 	defer tl.l.Unlock()
+	tl.lastTriggerDeadline = deadline
 	for t, ch := range tl.m {
 	for t, ch := range tl.m {
 		if t <= deadline {
 		if t <= deadline {
 			delete(tl.m, t)
 			delete(tl.m, t)

+ 10 - 3
pkg/wait/wait_time_test.go

@@ -29,19 +29,26 @@ func TestWaitTime(t *testing.T) {
 		t.Fatalf("cannot receive from ch as expected")
 		t.Fatalf("cannot receive from ch as expected")
 	}
 	}
 
 
-	ch2 := wt.Wait(2)
-	wt.Trigger(1)
+	ch2 := wt.Wait(4)
+	wt.Trigger(3)
 	select {
 	select {
 	case <-ch2:
 	case <-ch2:
 		t.Fatalf("unexpected to receive from ch2")
 		t.Fatalf("unexpected to receive from ch2")
 	default:
 	default:
 	}
 	}
-	wt.Trigger(3)
+	wt.Trigger(4)
 	select {
 	select {
 	case <-ch2:
 	case <-ch2:
 	default:
 	default:
 		t.Fatalf("cannot receive from ch2 as expected")
 		t.Fatalf("cannot receive from ch2 as expected")
 	}
 	}
+
+	select {
+	// wait on a triggered deadline
+	case <-wt.Wait(4):
+	default:
+		t.Fatalf("unexpected blocking when wait on triggered deadline")
+	}
 }
 }
 
 
 func TestWaitTestStress(t *testing.T) {
 func TestWaitTestStress(t *testing.T) {