Forráskód Böngészése

storage: KV field -> store field in watchableStore

We need to access the underlying store to use its RangeEvents function.
It is not good to use unnecessary type conversion.

The underlying store is also needed for further store upon
watchableStore.
Yicheng Qin 10 éve
szülő
commit
0f7374ce89
1 módosított fájl, 16 hozzáadás és 16 törlés
  1. 16 16
      storage/watchable_store.go

+ 16 - 16
storage/watchable_store.go

@@ -25,7 +25,7 @@ import (
 type watchableStore struct {
 type watchableStore struct {
 	mu sync.Mutex
 	mu sync.Mutex
 
 
-	KV
+	*store
 
 
 	// contains all unsynced watchers that needs to sync events that have happened
 	// contains all unsynced watchers that needs to sync events that have happened
 	// TODO: use map to reduce cancel cost
 	// TODO: use map to reduce cancel cost
@@ -41,7 +41,7 @@ type watchableStore struct {
 
 
 func newWatchableStore(path string) *watchableStore {
 func newWatchableStore(path string) *watchableStore {
 	s := &watchableStore{
 	s := &watchableStore{
-		KV:     newStore(path),
+		store:  newStore(path),
 		synced: make(map[string][]*watcher),
 		synced: make(map[string][]*watcher),
 		stopc:  make(chan struct{}),
 		stopc:  make(chan struct{}),
 	}
 	}
@@ -54,9 +54,9 @@ func (s *watchableStore) Put(key, value []byte) (rev int64) {
 	s.mu.Lock()
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	defer s.mu.Unlock()
 
 
-	rev = s.KV.Put(key, value)
+	rev = s.store.Put(key, value)
 	// TODO: avoid this range
 	// TODO: avoid this range
-	kvs, _, err := s.KV.Range(key, nil, 0, rev)
+	kvs, _, err := s.store.Range(key, nil, 0, rev)
 	if err != nil {
 	if err != nil {
 		log.Panicf("unexpected range error (%v)", err)
 		log.Panicf("unexpected range error (%v)", err)
 	}
 	}
@@ -72,11 +72,11 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
 	defer s.mu.Unlock()
 	defer s.mu.Unlock()
 
 
 	// TODO: avoid this range
 	// TODO: avoid this range
-	kvs, _, err := s.KV.Range(key, end, 0, 0)
+	kvs, _, err := s.store.Range(key, end, 0, 0)
 	if err != nil {
 	if err != nil {
 		log.Panicf("unexpected range error (%v)", err)
 		log.Panicf("unexpected range error (%v)", err)
 	}
 	}
-	n, rev = s.KV.DeleteRange(key, end)
+	n, rev = s.store.DeleteRange(key, end)
 	for _, kv := range kvs {
 	for _, kv := range kvs {
 		s.handle(rev, storagepb.Event{
 		s.handle(rev, storagepb.Event{
 			Type: storagepb.DELETE,
 			Type: storagepb.DELETE,
@@ -91,11 +91,11 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
 func (s *watchableStore) TxnBegin() int64 {
 func (s *watchableStore) TxnBegin() int64 {
 	s.mu.Lock()
 	s.mu.Lock()
 	s.tx = newOngoingTx()
 	s.tx = newOngoingTx()
-	return s.KV.TxnBegin()
+	return s.store.TxnBegin()
 }
 }
 
 
 func (s *watchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
 func (s *watchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
-	rev, err = s.KV.TxnPut(txnID, key, value)
+	rev, err = s.store.TxnPut(txnID, key, value)
 	if err == nil {
 	if err == nil {
 		s.tx.put(string(key))
 		s.tx.put(string(key))
 	}
 	}
@@ -103,11 +103,11 @@ func (s *watchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err
 }
 }
 
 
 func (s *watchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
 func (s *watchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
-	kvs, _, err := s.KV.TxnRange(txnID, key, end, 0, 0)
+	kvs, _, err := s.store.TxnRange(txnID, key, end, 0, 0)
 	if err != nil {
 	if err != nil {
 		log.Panicf("unexpected range error (%v)", err)
 		log.Panicf("unexpected range error (%v)", err)
 	}
 	}
-	n, rev, err = s.KV.TxnDeleteRange(txnID, key, end)
+	n, rev, err = s.store.TxnDeleteRange(txnID, key, end)
 	if err == nil {
 	if err == nil {
 		for _, kv := range kvs {
 		for _, kv := range kvs {
 			s.tx.del(string(kv.Key))
 			s.tx.del(string(kv.Key))
@@ -117,14 +117,14 @@ func (s *watchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev in
 }
 }
 
 
 func (s *watchableStore) TxnEnd(txnID int64) error {
 func (s *watchableStore) TxnEnd(txnID int64) error {
-	err := s.KV.TxnEnd(txnID)
+	err := s.store.TxnEnd(txnID)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
-	_, rev, _ := s.KV.Range(nil, nil, 0, 0)
+	_, rev, _ := s.store.Range(nil, nil, 0, 0)
 	for k := range s.tx.putm {
 	for k := range s.tx.putm {
-		kvs, _, err := s.KV.Range([]byte(k), nil, 0, 0)
+		kvs, _, err := s.store.Range([]byte(k), nil, 0, 0)
 		if err != nil {
 		if err != nil {
 			log.Panicf("unexpected range error (%v)", err)
 			log.Panicf("unexpected range error (%v)", err)
 		}
 		}
@@ -148,7 +148,7 @@ func (s *watchableStore) TxnEnd(txnID int64) error {
 func (s *watchableStore) Close() error {
 func (s *watchableStore) Close() error {
 	close(s.stopc)
 	close(s.stopc)
 	s.wg.Wait()
 	s.wg.Wait()
-	return s.KV.Close()
+	return s.store.Close()
 }
 }
 
 
 func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watcher, CancelFunc) {
 func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watcher, CancelFunc) {
@@ -211,7 +211,7 @@ func (s *watchableStore) syncWatchersLoop() {
 
 
 // syncWatchers syncs the watchers in the unsyncd map.
 // syncWatchers syncs the watchers in the unsyncd map.
 func (s *watchableStore) syncWatchers() {
 func (s *watchableStore) syncWatchers() {
-	_, curRev, _ := s.KV.Range(nil, nil, 0, 0)
+	_, curRev, _ := s.store.Range(nil, nil, 0, 0)
 
 
 	// filtering without allocating
 	// filtering without allocating
 	// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
 	// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
@@ -229,7 +229,7 @@ func (s *watchableStore) syncWatchers() {
 			nws = append(nws, w)
 			nws = append(nws, w)
 			continue
 			continue
 		}
 		}
-		evs, nextRev, err := s.KV.(*store).RangeEvents(w.key, end, int64(limit), w.cur)
+		evs, nextRev, err := s.store.RangeEvents(w.key, end, int64(limit), w.cur)
 		if err != nil {
 		if err != nil {
 			w.stopWithError(err)
 			w.stopWithError(err)
 			continue
 			continue