浏览代码

Merge pull request #4646 from xiang90/starvation

etcdserver: detect raft stravation
Xiang Li 9 年之前
父节点
当前提交
f0dbd0b856
共有 3 个文件被更改,包括 90 次插入0 次删除
  1. 12 0
      etcdserver/raft.go
  2. 9 0
      etcdserver/server.go
  3. 69 0
      pkg/contention/contention.go

+ 12 - 0
etcdserver/raft.go

@@ -25,6 +25,7 @@ import (
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/pkg/contention"
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
@@ -117,6 +118,8 @@ type raftNode struct {
 	// If transport is nil, server will panic.
 	transport rafthttp.Transporter
 
+	td *contention.TimeoutDetector
+
 	stopped chan struct{}
 	done    chan struct{}
 }
@@ -130,6 +133,14 @@ func (r *raftNode) start(s *EtcdServer) {
 	r.stopped = make(chan struct{})
 	r.done = make(chan struct{})
 
+	heartbeat := 200 * time.Millisecond
+	if s.cfg != nil {
+		heartbeat = time.Duration(s.cfg.TickMs) * time.Millisecond
+	}
+	// set up contention detectors for raft heartbeat message.
+	// expect to send a heartbeat within 2 heartbeat intervals.
+	r.td = contention.NewTimeoutDetector(2 * heartbeat)
+
 	go func() {
 		var syncC <-chan time.Time
 
@@ -162,6 +173,7 @@ func (r *raftNode) start(s *EtcdServer) {
 						if r.s.compactor != nil {
 							r.s.compactor.Resume()
 						}
+						r.td.Reset()
 					} else {
 						if r.s.lessor != nil {
 							r.s.lessor.Demote()

+ 9 - 0
etcdserver/server.go

@@ -918,6 +918,7 @@ func (s *EtcdServer) publish(timeout time.Duration) {
 	}
 }
 
+// TODO: move this function into raft.go
 func (s *EtcdServer) send(ms []raftpb.Message) {
 	for i := range ms {
 		if s.cluster.IsIDRemoved(types.ID(ms[i].To)) {
@@ -939,6 +940,14 @@ func (s *EtcdServer) send(ms []raftpb.Message) {
 				ms[i].To = 0
 			}
 		}
+		if ms[i].Type == raftpb.MsgHeartbeat {
+			ok, exceed := s.r.td.Observe(ms[i].To)
+			if !ok {
+				// TODO: limit request rate.
+				plog.Warningf("failed to send out heartbeat on time (deadline exceeded for %v)", exceed)
+				plog.Warningf("server is likely overloaded")
+			}
+		}
 	}
 
 	s.r.transport.Send(ms)

+ 69 - 0
pkg/contention/contention.go

@@ -0,0 +1,69 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package contention
+
+import (
+	"sync"
+	"time"
+)
+
+// TimeoutDetector detects routine starvations by
+// observing the actual time duration to finish an action
+// or between two events that should happen in a fixed
+// interval. If the observed duration is longer than
+// the expectation, the detector will report the result.
+type TimeoutDetector struct {
+	mu          sync.Mutex // protects all
+	maxDuration time.Duration
+	// map from event to time
+	// time is the last seen time of the event.
+	records map[uint64]time.Time
+}
+
+// NewTimeoutDetector creates the TimeoutDetector.
+func NewTimeoutDetector(maxDuration time.Duration) *TimeoutDetector {
+	return &TimeoutDetector{
+		maxDuration: maxDuration,
+		records:     make(map[uint64]time.Time),
+	}
+}
+
+// Reset resets the NewTimeoutDetector.
+func (td *TimeoutDetector) Reset() {
+	td.mu.Lock()
+	defer td.mu.Unlock()
+
+	td.records = make(map[uint64]time.Time)
+}
+
+// Observe observes an event for given id. It returns false and exceeded duration
+// if the interval is longer than the expectation.
+func (td *TimeoutDetector) Observe(which uint64) (bool, time.Duration) {
+	td.mu.Lock()
+	defer td.mu.Unlock()
+
+	ok := true
+	now := time.Now()
+	exceed := time.Duration(0)
+
+	if pt, found := td.records[which]; found {
+		exceed = now.Sub(pt) - td.maxDuration
+		if exceed > 0 {
+			ok = false
+		}
+	}
+	td.records[which] = now
+	return ok, exceed
+}