Browse Source

Merge pull request #3892 from xiang90/fix_snapshot_handling

etcdserver: handle incoming v3 snapshot correctly
Xiang Li 10 years ago
parent
commit
c400d05d0a
1 changed files with 18 additions and 5 deletions
  1. 18 5
      etcdserver/server.go

+ 18 - 5
etcdserver/server.go

@@ -496,21 +496,34 @@ func (s *EtcdServer) run() {
 				}
 
 				if s.cfg.V3demo {
-					if err := s.kv.Close(); err != nil {
-						plog.Panicf("close KV error: %v", err)
-					}
 					snapfn, err := s.r.raftStorage.snapStore.getSnapFilePath(apply.snapshot.Metadata.Index)
 					if err != nil {
 						plog.Panicf("get snapshot file path error: %v", err)
 					}
+
 					fn := path.Join(s.cfg.StorageDir(), databaseFilename)
 					if err := os.Rename(snapfn, fn); err != nil {
 						plog.Panicf("rename snapshot file error: %v", err)
 					}
-					s.kv = dstorage.New(fn, &s.consistIndex)
-					if err := s.kv.Restore(); err != nil {
+
+					newKV := dstorage.New(fn, &s.consistIndex)
+					if err := newKV.Restore(); err != nil {
 						plog.Panicf("restore KV error: %v", err)
 					}
+
+					oldKV := s.kv
+					// TODO: swap the kv pointer atomically
+					s.kv = newKV
+					s.r.raftStorage.snapStore.kv = newKV
+
+					// Closing oldKV might block until all the txns
+					// on the kv are finished.
+					// We do not want to wait on closing the old kv.
+					go func() {
+						if err := oldKV.Close(); err != nil {
+							plog.Panicf("close KV error: %v", err)
+						}
+					}()
 				}
 				if err := s.store.Recovery(apply.snapshot.Data); err != nil {
 					plog.Panicf("recovery store error: %v", err)