Browse Source

etcdserver, pkg: skip needless log entry applying

This commit lets etcdserver skip needless log entry applying. If the
result of log applying isn't required by the node (client that issued
the request isn't talking with the node) and the operation has no side
effects, applying can be skipped.

It would contribute to reduce disk I/O on followers and be useful for
a cluster that processes much serializable get.
Hitoshi Mitake 9 years ago
parent
commit
abb20ec51f
5 changed files with 51 additions and 1 deletions
  1. 4 0
      etcdserver/apply.go
  2. 9 1
      etcdserver/server.go
  3. 4 0
      pkg/mock/mockwait/wait_recorder.go
  4. 11 0
      pkg/wait/wait.go
  5. 23 0
      pkg/wait/wait_test.go

+ 4 - 0
etcdserver/apply.go

@@ -784,3 +784,7 @@ func compareInt64(a, b int64) int {
 func isGteRange(rangeEnd []byte) bool {
 func isGteRange(rangeEnd []byte) bool {
 	return len(rangeEnd) == 1 && rangeEnd[0] == 0
 	return len(rangeEnd) == 1 && rangeEnd[0] == 0
 }
 }
+
+func noSideEffect(r *pb.InternalRaftRequest) bool {
+	return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil
+}

+ 9 - 1
etcdserver/server.go

@@ -1082,8 +1082,16 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
 		id = raftReq.Header.ID
 		id = raftReq.Header.ID
 	}
 	}
 
 
-	ar := s.applyV3.Apply(&raftReq)
+	var ar *applyResult
+	if s.w.IsRegistered(id) || !noSideEffect(&raftReq) {
+		ar = s.applyV3.Apply(&raftReq)
+	}
 	s.setAppliedIndex(e.Index)
 	s.setAppliedIndex(e.Index)
+
+	if ar == nil {
+		return
+	}
+
 	if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
 	if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
 		s.w.Trigger(id, ar)
 		s.w.Trigger(id, ar)
 		return
 		return

+ 4 - 0
pkg/mock/mockwait/wait_recorder.go

@@ -41,3 +41,7 @@ func (w *waitRecorder) Register(id uint64) <-chan interface{} {
 func (w *waitRecorder) Trigger(id uint64, x interface{}) {
 func (w *waitRecorder) Trigger(id uint64, x interface{}) {
 	w.Record(testutil.Action{Name: "Trigger"})
 	w.Record(testutil.Action{Name: "Trigger"})
 }
 }
+
+func (w *waitRecorder) IsRegistered(id uint64) bool {
+	panic("waitRecorder.IsRegistered() shouldn't be called")
+}

+ 11 - 0
pkg/wait/wait.go

@@ -24,6 +24,7 @@ import (
 type Wait interface {
 type Wait interface {
 	Register(id uint64) <-chan interface{}
 	Register(id uint64) <-chan interface{}
 	Trigger(id uint64, x interface{})
 	Trigger(id uint64, x interface{})
+	IsRegistered(id uint64) bool
 }
 }
 
 
 type List struct {
 type List struct {
@@ -59,6 +60,13 @@ func (w *List) Trigger(id uint64, x interface{}) {
 	}
 	}
 }
 }
 
 
+func (w *List) IsRegistered(id uint64) bool {
+	w.l.Lock()
+	defer w.l.Unlock()
+	_, ok := w.m[id]
+	return ok
+}
+
 type waitWithResponse struct {
 type waitWithResponse struct {
 	ch <-chan interface{}
 	ch <-chan interface{}
 }
 }
@@ -71,3 +79,6 @@ func (w *waitWithResponse) Register(id uint64) <-chan interface{} {
 	return w.ch
 	return w.ch
 }
 }
 func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
 func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
+func (w *waitWithResponse) IsRegistered(id uint64) bool {
+	panic("waitWithResponse.IsRegistered() shouldn't be called")
+}

+ 23 - 0
pkg/wait/wait_test.go

@@ -77,3 +77,26 @@ func TestTriggerDupSuppression(t *testing.T) {
 		t.Errorf("unexpected non-nil value: %v (%T)", g, g)
 		t.Errorf("unexpected non-nil value: %v (%T)", g, g)
 	}
 	}
 }
 }
+
+func TestIsRegistered(t *testing.T) {
+	wt := New()
+
+	wt.Register(0)
+	wt.Register(1)
+	wt.Register(2)
+
+	for i := uint64(0); i < 3; i++ {
+		if !wt.IsRegistered(i) {
+			t.Errorf("event ID %d isn't registered", i)
+		}
+	}
+
+	if wt.IsRegistered(4) {
+		t.Errorf("event ID 4 shouldn't be registered")
+	}
+
+	wt.Trigger(0, "foo")
+	if wt.IsRegistered(0) {
+		t.Errorf("event ID 0 is already triggered, shouldn't be registered")
+	}
+}