Browse Source

Merge pull request #3977 from xiang90/fix_todo

etcdsever: swap kv pointer atomically
Xiang Li 10 years ago
parent
commit
460873689e
2 changed files with 28 additions and 11 deletions
  1. 20 5
      etcdserver/server.go
  2. 8 6
      etcdserver/v3demo_server.go

+ 20 - 5
etcdserver/server.go

@@ -24,6 +24,7 @@ import (
 	"os"
 	"path"
 	"regexp"
+	"sync"
 	"sync/atomic"
 	"time"
 
@@ -162,7 +163,9 @@ type EtcdServer struct {
 	cluster *cluster
 
 	store store.Store
-	kv    dstorage.ConsistentWatchableKV
+
+	kvMu sync.RWMutex
+	kv   dstorage.ConsistentWatchableKV
 
 	stats  *stats.ServerStats
 	lstats *stats.LeaderStats
@@ -506,9 +509,7 @@ func (s *EtcdServer) run() {
 						plog.Panicf("restore KV error: %v", err)
 					}
 
-					oldKV := s.kv
-					// TODO: swap the kv pointer atomically
-					s.kv = newKV
+					oldKV := s.swapKV(newKV)
 
 					// Closing oldKV might block until all the txns
 					// on the kv are finished.
@@ -1032,7 +1033,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
 		if s.cfg.V3demo {
 			// commit v3 storage because WAL file before snapshot index
 			// could be removed after SaveSnap.
-			s.kv.Commit()
+			s.getKV().Commit()
 		}
 		// SaveSnap saves the snapshot and releases the locked wal files
 		// to the snapshot index.
@@ -1172,6 +1173,20 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
 	}
 }
 
+func (s *EtcdServer) getKV() dstorage.ConsistentWatchableKV {
+	s.kvMu.RLock()
+	defer s.kvMu.RUnlock()
+	return s.kv
+}
+
+func (s *EtcdServer) swapKV(kv dstorage.ConsistentWatchableKV) dstorage.ConsistentWatchableKV {
+	s.kvMu.Lock()
+	defer s.kvMu.Unlock()
+	old := s.kv
+	s.kv = kv
+	return old
+}
+
 // isConnectedToQuorumSince checks whether the local member is connected to the
 // quorum of the cluster since the given time.
 func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*Member) bool {

+ 8 - 6
etcdserver/v3demo_server.go

@@ -102,7 +102,7 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern
 
 // Watcable returns a watchable interface attached to the etcdserver.
 func (s *EtcdServer) Watchable() dstorage.Watchable {
-	return s.kv
+	return s.getKV()
 }
 
 const (
@@ -113,19 +113,21 @@ const (
 )
 
 func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} {
+	kv := s.getKV()
+
 	ar := &applyResult{}
 
 	switch {
 	case r.Range != nil:
-		ar.resp, ar.err = applyRange(noTxn, s.kv, r.Range)
+		ar.resp, ar.err = applyRange(noTxn, kv, r.Range)
 	case r.Put != nil:
-		ar.resp, ar.err = applyPut(noTxn, s.kv, r.Put)
+		ar.resp, ar.err = applyPut(noTxn, kv, r.Put)
 	case r.DeleteRange != nil:
-		ar.resp, ar.err = applyDeleteRange(noTxn, s.kv, r.DeleteRange)
+		ar.resp, ar.err = applyDeleteRange(noTxn, kv, r.DeleteRange)
 	case r.Txn != nil:
-		ar.resp, ar.err = applyTxn(s.kv, r.Txn)
+		ar.resp, ar.err = applyTxn(kv, r.Txn)
 	case r.Compaction != nil:
-		ar.resp, ar.err = applyCompaction(s.kv, r.Compaction)
+		ar.resp, ar.err = applyCompaction(kv, r.Compaction)
 	default:
 		panic("not implemented")
 	}