Kaynağa Gözat

Merge pull request #8052 from heyitsanthony/watch-victim-test

mvcc: test watch victim/delay path
Anthony Romano 8 yıl önce
ebeveyn
işleme
300feea177
2 değiştirilmiş dosya ile 84 ekleme ve 1 silme
  1. 2 1
      mvcc/watchable_store.go
  2. 82 0
      mvcc/watchable_store_test.go

+ 2 - 1
mvcc/watchable_store.go

@@ -23,7 +23,8 @@ import (
 	"github.com/coreos/etcd/mvcc/mvccpb"
 )
 
-const (
+// non-const so modifiable by tests
+var (
 	// chanBufLen is the length of the buffered chan
 	// for sending out watched events.
 	// TODO: find a good buf value. 1024 is just a random one that

+ 82 - 0
mvcc/watchable_store_test.go

@@ -16,8 +16,10 @@ package mvcc
 
 import (
 	"bytes"
+	"fmt"
 	"os"
 	"reflect"
+	"sync"
 	"testing"
 	"time"
 
@@ -424,3 +426,83 @@ func TestNewMapwatcherToEventMap(t *testing.T) {
 		}
 	}
 }
+
+// TestWatchVictims tests that watchable store delivers watch events
+// when the watch channel is temporarily clogged with too many events.
+func TestWatchVictims(t *testing.T) {
+	oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
+
+	b, tmpPath := backend.NewDefaultTmpBackend()
+	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
+
+	defer func() {
+		s.store.Close()
+		os.Remove(tmpPath)
+		chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync
+	}()
+
+	chanBufLen, maxWatchersPerSync = 1, 2
+	numPuts := chanBufLen * 64
+	testKey, testValue := []byte("foo"), []byte("bar")
+
+	var wg sync.WaitGroup
+	numWatches := maxWatchersPerSync * 128
+	errc := make(chan error, numWatches)
+	wg.Add(numWatches)
+	for i := 0; i < numWatches; i++ {
+		go func() {
+			w := s.NewWatchStream()
+			w.Watch(testKey, nil, 1)
+			defer func() {
+				w.Close()
+				wg.Done()
+			}()
+			tc := time.After(10 * time.Second)
+			evs, nextRev := 0, int64(2)
+			for evs < numPuts {
+				select {
+				case <-tc:
+					errc <- fmt.Errorf("time out")
+					return
+				case wr := <-w.Chan():
+					evs += len(wr.Events)
+					for _, ev := range wr.Events {
+						if ev.Kv.ModRevision != nextRev {
+							errc <- fmt.Errorf("expected rev=%d, got %d", nextRev, ev.Kv.ModRevision)
+							return
+						}
+						nextRev++
+					}
+					time.Sleep(time.Millisecond)
+				}
+			}
+			if evs != numPuts {
+				errc <- fmt.Errorf("expected %d events, got %d", numPuts, evs)
+				return
+			}
+			select {
+			case <-w.Chan():
+				errc <- fmt.Errorf("unexpected response")
+			default:
+			}
+		}()
+		time.Sleep(time.Millisecond)
+	}
+
+	var wgPut sync.WaitGroup
+	wgPut.Add(numPuts)
+	for i := 0; i < numPuts; i++ {
+		go func() {
+			defer wgPut.Done()
+			s.Put(testKey, testValue, lease.NoLease)
+		}()
+	}
+	wgPut.Wait()
+
+	wg.Wait()
+	select {
+	case err := <-errc:
+		t.Fatal(err)
+	default:
+	}
+}