Bläddra i källkod

Merge pull request #7795 from heyitsanthony/dont-force-initrev

clientv3: only update initReq.rev == 0 with watch revision
Anthony Romano 8 år sedan
förälder
incheckning
7da451640f
4 ändrade filer med 106 tillägg och 10 borttagningar
  1. 51 1
      clientv3/integration/watch_test.go
  2. 10 1
      clientv3/watch.go
  3. 42 7
      integration/bridge.go
  4. 3 1
      integration/cluster.go

+ 51 - 1
clientv3/integration/watch_test.go

@@ -343,7 +343,57 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
 	}
 }
 
-// TestWatchResumeComapcted checks that the watcher gracefully closes in case
+func TestWatchResumeInitRev(t *testing.T) {
+	defer testutil.AfterTest(t)
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	cli := clus.Client(0)
+	if _, err := cli.Put(context.TODO(), "b", "2"); err != nil {
+		t.Fatal(err)
+	}
+	if _, err := cli.Put(context.TODO(), "a", "3"); err != nil {
+		t.Fatal(err)
+	}
+	// if resume is broken, it'll pick up this key first instead of a=3
+	if _, err := cli.Put(context.TODO(), "a", "4"); err != nil {
+		t.Fatal(err)
+	}
+
+	wch := clus.Client(0).Watch(context.Background(), "a", clientv3.WithRev(1), clientv3.WithCreatedNotify())
+	if resp, ok := <-wch; !ok || resp.Header.Revision != 4 {
+		t.Fatalf("got (%v, %v), expected create notification rev=4", resp, ok)
+	}
+	// pause wch
+	clus.Members[0].DropConnections()
+	clus.Members[0].PauseConnections()
+
+	select {
+	case resp, ok := <-wch:
+		t.Skipf("wch should block, got (%+v, %v); drop not fast enough", resp, ok)
+	case <-time.After(100 * time.Millisecond):
+	}
+
+	// resume wch
+	clus.Members[0].UnpauseConnections()
+
+	select {
+	case resp, ok := <-wch:
+		if !ok {
+			t.Fatal("unexpected watch close")
+		}
+		if len(resp.Events) == 0 {
+			t.Fatal("expected event on watch")
+		}
+		if string(resp.Events[0].Kv.Value) != "3" {
+			t.Fatalf("expected value=3, got event %+v", resp.Events[0])
+		}
+	case <-time.After(5 * time.Second):
+		t.Fatal("watch timed out")
+	}
+}
+
+// TestWatchResumeCompacted checks that the watcher gracefully closes in case
 // that it tries to resume to a revision that's been compacted out of the store.
 // Since the watcher's server restarts with stale data, the watcher will receive
 // either a compaction error or all keys by staying in sync before the compaction

+ 10 - 1
clientv3/watch.go

@@ -615,11 +615,20 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
 					// send first creation event only if requested
 					if ws.initReq.createdNotify {
 						ws.outc <- *wr
+						if ws.initReq.rev == 0 {
+							// current revision of store; returning the
+							// create response binds the current revision to
+							// this revision, so restart with it if there's a
+							// disconnect before receiving any events.
+							nextRev = wr.Header.Revision
+						}
 					}
 				}
+			} else {
+				// current progress of watch; <= store revision
+				nextRev = wr.Header.Revision
 			}
 
-			nextRev = wr.Header.Revision
 			if len(wr.Events) > 0 {
 				nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
 			}

+ 42 - 7
integration/bridge.go

@@ -31,8 +31,9 @@ type bridge struct {
 	l       net.Listener
 	conns   map[*bridgeConn]struct{}
 
-	stopc chan struct{}
-	wg    sync.WaitGroup
+	stopc  chan struct{}
+	pausec chan struct{}
+	wg     sync.WaitGroup
 
 	mu sync.Mutex
 }
@@ -43,8 +44,11 @@ func newBridge(addr string) (*bridge, error) {
 		inaddr:  addr + "0",
 		outaddr: addr,
 		conns:   make(map[*bridgeConn]struct{}),
-		stopc:   make(chan struct{}, 1),
+		stopc:   make(chan struct{}),
+		pausec:  make(chan struct{}),
 	}
+	close(b.pausec)
+
 	l, err := transport.NewUnixListener(b.inaddr)
 	if err != nil {
 		return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err)
@@ -59,10 +63,13 @@ func (b *bridge) URL() string { return "unix://" + b.inaddr }
 
 func (b *bridge) Close() {
 	b.l.Close()
+	b.mu.Lock()
 	select {
-	case b.stopc <- struct{}{}:
+	case <-b.stopc:
 	default:
+		close(b.stopc)
 	}
+	b.mu.Unlock()
 	b.wg.Wait()
 }
 
@@ -75,6 +82,22 @@ func (b *bridge) Reset() {
 	b.conns = make(map[*bridgeConn]struct{})
 }
 
+func (b *bridge) Pause() {
+	b.mu.Lock()
+	b.pausec = make(chan struct{})
+	b.mu.Unlock()
+}
+
+func (b *bridge) Unpause() {
+	b.mu.Lock()
+	select {
+	case <-b.pausec:
+	default:
+		close(b.pausec)
+	}
+	b.mu.Unlock()
+}
+
 func (b *bridge) serveListen() {
 	defer func() {
 		b.l.Close()
@@ -91,13 +114,22 @@ func (b *bridge) serveListen() {
 		if ierr != nil {
 			return
 		}
+		b.mu.Lock()
+		pausec := b.pausec
+		b.mu.Unlock()
+		select {
+		case <-b.stopc:
+			return
+		case <-pausec:
+		}
+
 		outc, oerr := net.Dial("unix", b.outaddr)
 		if oerr != nil {
 			inc.Close()
 			return
 		}
 
-		bc := &bridgeConn{inc, outc}
+		bc := &bridgeConn{inc, outc, make(chan struct{})}
 		b.wg.Add(1)
 		b.mu.Lock()
 		b.conns[bc] = struct{}{}
@@ -108,6 +140,7 @@ func (b *bridge) serveListen() {
 
 func (b *bridge) serveConn(bc *bridgeConn) {
 	defer func() {
+		close(bc.donec)
 		bc.Close()
 		b.mu.Lock()
 		delete(b.conns, bc)
@@ -129,11 +162,13 @@ func (b *bridge) serveConn(bc *bridgeConn) {
 }
 
 type bridgeConn struct {
-	in  net.Conn
-	out net.Conn
+	in    net.Conn
+	out   net.Conn
+	donec chan struct{}
 }
 
 func (bc *bridgeConn) Close() {
 	bc.in.Close()
 	bc.out.Close()
+	<-bc.donec
 }

+ 3 - 1
integration/cluster.go

@@ -556,7 +556,9 @@ func (m *member) electionTimeout() time.Duration {
 	return time.Duration(m.s.Cfg.ElectionTicks) * time.Millisecond
 }
 
-func (m *member) DropConnections() { m.grpcBridge.Reset() }
+func (m *member) DropConnections()    { m.grpcBridge.Reset() }
+func (m *member) PauseConnections()   { m.grpcBridge.Pause() }
+func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
 
 // NewClientV3 creates a new grpc client connection to the member
 func NewClientV3(m *member) (*clientv3.Client, error) {