Browse Source

Merge pull request #3899 from gyuho/3859_synced_map

storage: use map for watchableStore synced
Xiang Li 10 years ago
parent
commit
2de9a5bbd0
3 changed files with 189 additions and 25 deletions
  1. 55 24
      storage/watchable_store.go
  2. 37 1
      storage/watchable_store_bench_test.go
  3. 97 0
      storage/watchable_store_test.go

+ 55 - 24
storage/watchable_store.go

@@ -15,6 +15,7 @@
 package storage
 
 import (
+	"fmt"
 	"log"
 	"sync"
 	"time"
@@ -44,7 +45,7 @@ type watchableStore struct {
 
 	// contains all synced watching that are tracking the events that will happen
 	// The key of the map is the key that the watching is watching on.
-	synced map[string][]*watching
+	synced map[string]map[*watching]struct{}
 	tx     *ongoingTx
 
 	stopc chan struct{}
@@ -55,7 +56,7 @@ func newWatchableStore(path string) *watchableStore {
 	s := &watchableStore{
 		store:    newStore(path),
 		unsynced: make(map[*watching]struct{}),
-		synced:   make(map[string][]*watching),
+		synced:   make(map[string]map[*watching]struct{}),
 		stopc:    make(chan struct{}),
 	}
 	s.wg.Add(1)
@@ -185,7 +186,9 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, ch chan<
 
 	k := string(key)
 	if startRev == 0 {
-		s.synced[k] = append(s.synced[k], wa)
+		if err := unsafeAddWatching(&s.synced, k, wa); err != nil {
+			log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
+		}
 	} else {
 		slowWatchingGauge.Inc()
 		s.unsynced[wa] = struct{}{}
@@ -203,9 +206,14 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, ch chan<
 			return
 		}
 
-		for i, w := range s.synced[k] {
-			if w == wa {
-				s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...)
+		if v, ok := s.synced[k]; ok {
+			if _, ok := v[wa]; ok {
+				delete(v, wa)
+				// if there is nothing in s.synced[k],
+				// remove the key from the synced
+				if len(v) == 0 {
+					delete(s.synced, k)
+				}
 				watchingGauge.Dec()
 			}
 		}
@@ -272,7 +280,10 @@ func (s *watchableStore) syncWatchings() {
 		}
 		// switch to tracking future events if needed
 		if nextRev > curRev {
-			s.synced[string(w.key)] = append(s.synced[string(w.key)], w)
+			k := string(w.key)
+			if err := unsafeAddWatching(&s.synced, k, w); err != nil {
+				log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
+			}
 			delete(s.unsynced, w)
 			continue
 		}
@@ -292,25 +303,25 @@ func (s *watchableStore) handle(rev int64, ev storagepb.Event) {
 func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
 	// check all prefixes of the key to notify all corresponded watchings
 	for i := 0; i <= len(ev.Kv.Key); i++ {
-		ws := s.synced[string(ev.Kv.Key[:i])]
-		nws := ws[:0]
-		for _, w := range ws {
-			// the watching needs to be notified when either it watches prefix or
-			// the key is exactly matched.
-			if !w.prefix && i != len(ev.Kv.Key) {
-				continue
-			}
-			select {
-			case w.ch <- ev:
-				pendingEventsGauge.Inc()
-				nws = append(nws, w)
-			default:
-				w.cur = rev
-				s.unsynced[w] = struct{}{}
-				slowWatchingGauge.Inc()
+		k := string(ev.Kv.Key[:i])
+		if wm, ok := s.synced[k]; ok {
+			for w := range wm {
+				// the watching needs to be notified when either it watches prefix or
+				// the key is exactly matched.
+				if !w.prefix && i != len(ev.Kv.Key) {
+					continue
+				}
+				select {
+				case w.ch <- ev:
+					pendingEventsGauge.Inc()
+				default:
+					w.cur = rev
+					s.unsynced[w] = struct{}{}
+					delete(wm, w)
+					slowWatchingGauge.Inc()
+				}
 			}
 		}
-		s.synced[string(ev.Kv.Key[:i])] = nws
 	}
 }
 
@@ -356,3 +367,23 @@ type watching struct {
 	// The chan might be shared with other watchings.
 	ch chan<- storagepb.Event
 }
+
+// unsafeAddWatching puts watching with key k into watchableStore's synced.
+// Make sure to this is thread-safe using mutex before and after.
+func unsafeAddWatching(synced *map[string]map[*watching]struct{}, k string, wa *watching) error {
+	if wa == nil {
+		return fmt.Errorf("nil watching received")
+	}
+	mp := *synced
+	if v, ok := mp[k]; ok {
+		if _, ok := v[wa]; ok {
+			return fmt.Errorf("put the same watch twice: %+v", wa)
+		} else {
+			v[wa] = struct{}{}
+		}
+
+	}
+	mp[k] = make(map[*watching]struct{})
+	mp[k][wa] = struct{}{}
+	return nil
+}

+ 37 - 1
storage/watchable_store_bench_test.go

@@ -44,7 +44,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 
 		// to make the test not crash from assigning to nil map.
 		// 'synced' doesn't get populated in this test.
-		synced: make(map[string][]*watching),
+		synced: make(map[string]map[*watching]struct{}),
 	}
 
 	defer func() {
@@ -81,3 +81,39 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 		cancels[idx]()
 	}
 }
+
+func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
+	s := newWatchableStore(tmpPath)
+	defer func() {
+		s.store.Close()
+		os.Remove(tmpPath)
+	}()
+
+	// Put a key so that we can spawn watchers on that key
+	testKey := []byte("foo")
+	testValue := []byte("bar")
+	s.Put(testKey, testValue)
+
+	w := s.NewWatcher()
+
+	// put 1 million watchers on the same key
+	const watcherSize = 1000000
+
+	cancels := make([]CancelFunc, watcherSize)
+	for i := 0; i < watcherSize; i++ {
+		// 0 for startRev to keep watchers in synced
+		cancel := w.Watch(testKey, true, 0)
+		cancels[i] = cancel
+	}
+
+	// randomly cancel watchers to make it not biased towards
+	// data structures with an order, such as slice.
+	ix := rand.Perm(watcherSize)
+
+	b.ResetTimer()
+	b.ReportAllocs()
+
+	for _, idx := range ix {
+		cancels[idx]()
+	}
+}

+ 97 - 0
storage/watchable_store_test.go

@@ -0,0 +1,97 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+	"os"
+	"testing"
+)
+
+func TestWatch(t *testing.T) {
+	s := newWatchableStore(tmpPath)
+	defer func() {
+		s.store.Close()
+		os.Remove(tmpPath)
+	}()
+	testKey := []byte("foo")
+	testValue := []byte("bar")
+	s.Put(testKey, testValue)
+
+	w := s.NewWatcher()
+	w.Watch(testKey, true, 0)
+
+	if _, ok := s.synced[string(testKey)]; !ok {
+		// the key must have had an entry in synced
+		t.Errorf("existence = %v, want true", ok)
+	}
+}
+
+func TestNewWatcherCancel(t *testing.T) {
+	s := newWatchableStore(tmpPath)
+	defer func() {
+		s.store.Close()
+		os.Remove(tmpPath)
+	}()
+	testKey := []byte("foo")
+	testValue := []byte("bar")
+	s.Put(testKey, testValue)
+
+	w := s.NewWatcher()
+	cancel := w.Watch(testKey, true, 0)
+
+	cancel()
+
+	if _, ok := s.synced[string(testKey)]; ok {
+		// the key shoud have been deleted
+		t.Errorf("existence = %v, want false", ok)
+	}
+}
+
+func TestUnsafeAddWatching(t *testing.T) {
+	s := newWatchableStore(tmpPath)
+	defer func() {
+		s.store.Close()
+		os.Remove(tmpPath)
+	}()
+	testKey := []byte("foo")
+	testValue := []byte("bar")
+	s.Put(testKey, testValue)
+
+	wa := &watching{
+		key:    testKey,
+		prefix: true,
+		cur:    0,
+	}
+
+	if err := unsafeAddWatching(&s.synced, string(testKey), wa); err != nil {
+		t.Error(err)
+	}
+
+	if v, ok := s.synced[string(testKey)]; !ok {
+		// the key must have had entry in synced
+		t.Errorf("existence = %v, want true", ok)
+	} else {
+		if len(v) != 1 {
+			// the key must have ONE entry in its watching map
+			t.Errorf("len(v) = %d, want 1", len(v))
+		}
+	}
+
+	if err := unsafeAddWatching(&s.synced, string(testKey), wa); err == nil {
+		// unsafeAddWatching should have returned error
+		// when putting the same watch twice"
+		t.Error(`error = nil, want "put the same watch twice"`)
+	}
+}