Browse Source

mvcc: test concurrently closing watch streams and canceling watches

Triggers a race that causes a write to a closed watch stream channel.
Anthony Romano 8 years ago
parent
commit
bd53ae5680
1 changed files with 46 additions and 0 deletions
  1. 46 0
      mvcc/watchable_store_test.go

+ 46 - 0
mvcc/watchable_store_test.go

@@ -539,3 +539,49 @@ func TestWatchVictims(t *testing.T) {
 	default:
 	}
 }
+
+// TestStressWatchCancelClose tests closing a watch stream while
+// canceling its watches.
+func TestStressWatchCancelClose(t *testing.T) {
+	b, tmpPath := backend.NewDefaultTmpBackend()
+	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
+
+	defer func() {
+		s.store.Close()
+		os.Remove(tmpPath)
+	}()
+
+	testKey, testValue := []byte("foo"), []byte("bar")
+	var wg sync.WaitGroup
+	readyc := make(chan struct{})
+	wg.Add(100)
+	for i := 0; i < 100; i++ {
+		go func() {
+			defer wg.Done()
+			w := s.NewWatchStream()
+			ids := make([]WatchID, 10)
+			for i := range ids {
+				ids[i] = w.Watch(testKey, nil, 0)
+			}
+			<-readyc
+			wg.Add(1 + len(ids)/2)
+			for i := range ids[:len(ids)/2] {
+				go func(n int) {
+					defer wg.Done()
+					w.Cancel(ids[n])
+				}(i)
+			}
+			go func() {
+				defer wg.Done()
+				w.Close()
+			}()
+		}()
+	}
+
+	close(readyc)
+	for i := 0; i < 100; i++ {
+		s.Put(testKey, testValue, lease.NoLease)
+	}
+
+	wg.Wait()
+}