|
|
@@ -39,6 +39,7 @@ import (
|
|
|
"github.com/coreos/etcd/pkg/idutil"
|
|
|
"github.com/coreos/etcd/pkg/pbutil"
|
|
|
"github.com/coreos/etcd/pkg/runtime"
|
|
|
+ "github.com/coreos/etcd/pkg/schedule"
|
|
|
"github.com/coreos/etcd/pkg/timeutil"
|
|
|
"github.com/coreos/etcd/pkg/types"
|
|
|
"github.com/coreos/etcd/pkg/wait"
|
|
|
@@ -487,51 +488,6 @@ type etcdProgress struct {
|
|
|
appliedi uint64
|
|
|
}
|
|
|
|
|
|
-// startApplier buffers apply operations so raftNode won't block on sending
|
|
|
-// new applies, timing out (since applies can be slow). The goroutine begins
|
|
|
-// shutdown on close(s.done) and closes the returned channel when finished.
|
|
|
-func (s *EtcdServer) startApplier(ep etcdProgress) <-chan struct{} {
|
|
|
- donec := make(chan struct{})
|
|
|
- go func() {
|
|
|
- defer close(donec)
|
|
|
- pending := []apply{}
|
|
|
- sdonec := s.done
|
|
|
- apdonec := make(chan struct{})
|
|
|
- // serialized function
|
|
|
- f := func(ap apply) {
|
|
|
- s.applyAll(&ep, &ap)
|
|
|
- select {
|
|
|
- // snapshot requested via send()
|
|
|
- case m := <-s.msgSnapC:
|
|
|
- merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
|
|
|
- s.sendMergedSnap(merged)
|
|
|
- default:
|
|
|
- }
|
|
|
- apdonec <- struct{}{}
|
|
|
- }
|
|
|
- for sdonec != nil || len(pending) > 0 {
|
|
|
- select {
|
|
|
- // launch if no pending apply packet, queue up the rest
|
|
|
- case ap := <-s.r.apply():
|
|
|
- pending = append(pending, ap)
|
|
|
- if len(pending) == 1 {
|
|
|
- go f(pending[0])
|
|
|
- }
|
|
|
- // pending apply serviced, schedule the next one
|
|
|
- case <-apdonec:
|
|
|
- pending = pending[1:]
|
|
|
- if len(pending) != 0 {
|
|
|
- go f(pending[0])
|
|
|
- }
|
|
|
- // run() is finished; drain pending and exit
|
|
|
- case <-sdonec:
|
|
|
- sdonec = nil
|
|
|
- }
|
|
|
- }
|
|
|
- }()
|
|
|
- return donec
|
|
|
-}
|
|
|
-
|
|
|
func (s *EtcdServer) run() {
|
|
|
snap, err := s.r.raftStorage.Snapshot()
|
|
|
if err != nil {
|
|
|
@@ -540,14 +496,17 @@ func (s *EtcdServer) run() {
|
|
|
s.r.start(s)
|
|
|
|
|
|
// asynchronously accept apply packets, dispatch progress in-order
|
|
|
- appdonec := s.startApplier(etcdProgress{
|
|
|
+ sched := schedule.NewFIFOScheduler()
|
|
|
+ ep := etcdProgress{
|
|
|
confState: snap.Metadata.ConfState,
|
|
|
snapi: snap.Metadata.Index,
|
|
|
appliedi: snap.Metadata.Index,
|
|
|
- })
|
|
|
+ }
|
|
|
|
|
|
defer func() {
|
|
|
s.r.stop()
|
|
|
+ sched.Stop()
|
|
|
+
|
|
|
// kv, lessor and backend can be nil if running without v3 enabled
|
|
|
// or running unit tests.
|
|
|
if s.lessor != nil {
|
|
|
@@ -560,7 +519,6 @@ func (s *EtcdServer) run() {
|
|
|
s.be.Close()
|
|
|
}
|
|
|
close(s.done)
|
|
|
- <-appdonec
|
|
|
}()
|
|
|
|
|
|
var expiredLeaseC <-chan []*lease.Lease
|
|
|
@@ -570,6 +528,9 @@ func (s *EtcdServer) run() {
|
|
|
|
|
|
for {
|
|
|
select {
|
|
|
+ case ap := <-s.r.apply():
|
|
|
+ f := func(context.Context) { s.applyAll(&ep, &ap) }
|
|
|
+ sched.Schedule(f)
|
|
|
case leases := <-expiredLeaseC:
|
|
|
go func() {
|
|
|
for _, l := range leases {
|
|
|
@@ -594,6 +555,13 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
|
|
|
// storage, since the raft routine might be slower than apply routine.
|
|
|
<-apply.raftDone
|
|
|
s.triggerSnapshot(ep)
|
|
|
+ select {
|
|
|
+ // snapshot requested via send()
|
|
|
+ case m := <-s.msgSnapC:
|
|
|
+ merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
|
|
|
+ s.sendMergedSnap(merged)
|
|
|
+ default:
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|