|
|
@@ -474,35 +474,72 @@ type etcdProgress struct {
|
|
|
appliedi uint64
|
|
|
}
|
|
|
|
|
|
+// newApplier buffers apply operations and streams their results over an
|
|
|
+// etcdProgress output channel. This is so raftNode won't block on sending
|
|
|
+// new applies, timing out (since applies can be slow). The goroutine begins
|
|
|
+// shutdown on close(s.done) and closes the etcdProgress channel when finished.
|
|
|
+func (s *EtcdServer) newApplier(ep etcdProgress) <-chan etcdProgress {
|
|
|
+ etcdprogc := make(chan etcdProgress)
|
|
|
+ go func() {
|
|
|
+ defer close(etcdprogc)
|
|
|
+ pending := []apply{}
|
|
|
+ sdonec := s.done
|
|
|
+ apdonec := make(chan struct{})
|
|
|
+ // serialized function
|
|
|
+ f := func(ap apply) {
|
|
|
+ s.applyAll(&ep, &ap)
|
|
|
+ etcdprogc <- ep
|
|
|
+ apdonec <- struct{}{}
|
|
|
+ }
|
|
|
+ for sdonec != nil || len(pending) > 0 {
|
|
|
+ select {
|
|
|
+ // launch if no pending apply packet, queue up the rest
|
|
|
+ case ap := <-s.r.apply():
|
|
|
+ pending = append(pending, ap)
|
|
|
+ if len(pending) == 1 {
|
|
|
+ go f(pending[0])
|
|
|
+ }
|
|
|
+ // pending apply serviced, schedule the next one
|
|
|
+ case <-apdonec:
|
|
|
+ pending = pending[1:]
|
|
|
+ if len(pending) != 0 {
|
|
|
+ go f(pending[0])
|
|
|
+ }
|
|
|
+ // run() is finished; drain pending and exit
|
|
|
+ case <-sdonec:
|
|
|
+ sdonec = nil
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ return etcdprogc
|
|
|
+}
|
|
|
+
|
|
|
func (s *EtcdServer) run() {
|
|
|
snap, err := s.r.raftStorage.Snapshot()
|
|
|
if err != nil {
|
|
|
plog.Panicf("get snapshot from raft storage error: %v", err)
|
|
|
}
|
|
|
s.r.start(s)
|
|
|
- defer func() {
|
|
|
- s.r.stop()
|
|
|
- close(s.done)
|
|
|
- }()
|
|
|
|
|
|
+ // asynchronously accept apply packets, dispatch progress in-order
|
|
|
ep := etcdProgress{
|
|
|
confState: snap.Metadata.ConfState,
|
|
|
snapi: snap.Metadata.Index,
|
|
|
appliedi: snap.Metadata.Index,
|
|
|
}
|
|
|
+ etcdprogc := s.newApplier(ep)
|
|
|
+
|
|
|
+ defer func() {
|
|
|
+ s.r.stop()
|
|
|
+ close(s.done)
|
|
|
+ for range etcdprogc {
|
|
|
+ /* wait for outstanding applys */
|
|
|
+ }
|
|
|
+ }()
|
|
|
|
|
|
for {
|
|
|
select {
|
|
|
- case apply := <-s.r.apply():
|
|
|
- s.applySnapshot(&ep, &apply)
|
|
|
- s.applyEntries(&ep, &apply)
|
|
|
- // wait for the raft routine to finish the disk writes before triggering a
|
|
|
- // snapshot. or applied index might be greater than the last index in raft
|
|
|
- // storage, since the raft routine might be slower than apply routine.
|
|
|
- apply.done <- struct{}{}
|
|
|
-
|
|
|
- // trigger snapshot
|
|
|
- s.triggerSnapshot(&ep)
|
|
|
+ case ep = <-etcdprogc:
|
|
|
case m := <-s.msgSnapC:
|
|
|
merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
|
|
|
s.r.transport.SendSnapshot(merged)
|
|
|
@@ -514,6 +551,17 @@ func (s *EtcdServer) run() {
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
|
|
|
+ s.applySnapshot(ep, apply)
|
|
|
+ s.applyEntries(ep, apply)
|
|
|
+ // wait for the raft routine to finish the disk writes before triggering a
|
|
|
+ // snapshot. or applied index might be greater than the last index in raft
|
|
|
+ // storage, since the raft routine might be slower than apply routine.
|
|
|
+ apply.done <- struct{}{}
|
|
|
+ s.triggerSnapshot(ep)
|
|
|
}
|
|
|
|
|
|
func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|