Browse Source

Merge pull request #5689 from mitake/skip-apply

RFC: etcdserver, pkg: skip needless log entry applying
Hitoshi Mitake 9 years ago
parent
commit
c47689d98f
5 changed files with 51 additions and 1 deletions
  1. 4 0
      etcdserver/apply.go
  2. 9 1
      etcdserver/server.go
  3. 4 0
      pkg/mock/mockwait/wait_recorder.go
  4. 11 0
      pkg/wait/wait.go
  5. 23 0
      pkg/wait/wait_test.go

+ 4 - 0
etcdserver/apply.go

@@ -784,3 +784,7 @@ func compareInt64(a, b int64) int {
 func isGteRange(rangeEnd []byte) bool {
 	return len(rangeEnd) == 1 && rangeEnd[0] == 0
 }
+
+func noSideEffect(r *pb.InternalRaftRequest) bool {
+	return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil
+}

+ 9 - 1
etcdserver/server.go

@@ -1076,8 +1076,16 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
 		id = raftReq.Header.ID
 	}
 
-	ar := s.applyV3.Apply(&raftReq)
+	var ar *applyResult
+	if s.w.IsRegistered(id) || !noSideEffect(&raftReq) {
+		ar = s.applyV3.Apply(&raftReq)
+	}
 	s.setAppliedIndex(e.Index)
+
+	if ar == nil {
+		return
+	}
+
 	if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
 		s.w.Trigger(id, ar)
 		return

+ 4 - 0
pkg/mock/mockwait/wait_recorder.go

@@ -41,3 +41,7 @@ func (w *waitRecorder) Register(id uint64) <-chan interface{} {
 func (w *waitRecorder) Trigger(id uint64, x interface{}) {
 	w.Record(testutil.Action{Name: "Trigger"})
 }
+
+func (w *waitRecorder) IsRegistered(id uint64) bool {
+	panic("waitRecorder.IsRegistered() shouldn't be called")
+}

+ 11 - 0
pkg/wait/wait.go

@@ -24,6 +24,7 @@ import (
 type Wait interface {
 	Register(id uint64) <-chan interface{}
 	Trigger(id uint64, x interface{})
+	IsRegistered(id uint64) bool
 }
 
 type List struct {
@@ -59,6 +60,13 @@ func (w *List) Trigger(id uint64, x interface{}) {
 	}
 }
 
+func (w *List) IsRegistered(id uint64) bool {
+	w.l.Lock()
+	defer w.l.Unlock()
+	_, ok := w.m[id]
+	return ok
+}
+
 type waitWithResponse struct {
 	ch <-chan interface{}
 }
@@ -71,3 +79,6 @@ func (w *waitWithResponse) Register(id uint64) <-chan interface{} {
 	return w.ch
 }
 func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
+func (w *waitWithResponse) IsRegistered(id uint64) bool {
+	panic("waitWithResponse.IsRegistered() shouldn't be called")
+}

+ 23 - 0
pkg/wait/wait_test.go

@@ -77,3 +77,26 @@ func TestTriggerDupSuppression(t *testing.T) {
 		t.Errorf("unexpected non-nil value: %v (%T)", g, g)
 	}
 }
+
+func TestIsRegistered(t *testing.T) {
+	wt := New()
+
+	wt.Register(0)
+	wt.Register(1)
+	wt.Register(2)
+
+	for i := uint64(0); i < 3; i++ {
+		if !wt.IsRegistered(i) {
+			t.Errorf("event ID %d isn't registered", i)
+		}
+	}
+
+	if wt.IsRegistered(4) {
+		t.Errorf("event ID 4 shouldn't be registered")
+	}
+
+	wt.Trigger(0, "foo")
+	if wt.IsRegistered(0) {
+		t.Errorf("event ID 0 is already triggered, shouldn't be registered")
+	}
+}