Browse Source

Merge pull request #6647 from gyuho/watch

clientv3: send create event over outc
Gyu-Ho Lee 9 years ago
parent
commit
1b36162659
2 changed files with 55 additions and 13 deletions
  1. 30 0
      clientv3/integration/watch_test.go
  2. 25 13
      clientv3/watch.go

+ 30 - 0
clientv3/integration/watch_test.go

@@ -771,6 +771,36 @@ func TestWatchWithCreatedNotification(t *testing.T) {
 	}
 }
 
+// TestWatchWithCreatedNotificationDropConn ensures that
+// a watcher with created notify does not post duplicate
+// created events from disconnect.
+func TestWatchWithCreatedNotificationDropConn(t *testing.T) {
+	cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	defer cluster.Terminate(t)
+
+	client := cluster.RandClient()
+
+	wch := client.Watch(context.Background(), "a", clientv3.WithCreatedNotify())
+
+	resp := <-wch
+
+	if !resp.Created {
+		t.Fatalf("expected created event, got %v", resp)
+	}
+
+	cluster.Members[0].DropConnections()
+
+	// try to receive from watch channel again
+	// ensure it doesn't post another createNotify
+	select {
+	case wresp := <-wch:
+		t.Fatalf("got unexpected watch response: %+v\n", wresp)
+	case <-time.After(time.Second):
+		// watcher may not reconnect by the time it hits the select,
+		// so it wouldn't have a chance to filter out the second create event
+	}
+}
+
 // TestWatchCancelOnServer ensures client watcher cancels propagate back to the server.
 func TestWatchCancelOnServer(t *testing.T) {
 	cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})

+ 25 - 13
clientv3/watch.go

@@ -586,17 +586,6 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
 		curWr := emptyWr
 		outc := ws.outc
 
-		if len(ws.buf) > 0 && ws.buf[0].Created {
-			select {
-			case ws.initReq.retc <- ws.outc:
-				// send first creation event and only if requested
-				if !ws.initReq.createdNotify {
-					ws.buf = ws.buf[1:]
-				}
-			default:
-			}
-		}
-
 		if len(ws.buf) > 0 {
 			curWr = ws.buf[0]
 		} else {
@@ -614,13 +603,36 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
 				// shutdown from closeSubstream
 				return
 			}
-			// TODO pause channel if buffer gets too large
-			ws.buf = append(ws.buf, wr)
+
+			if wr.Created {
+				if ws.initReq.retc != nil {
+					ws.initReq.retc <- ws.outc
+					// to prevent next write from taking the slot in buffered channel
+					// and posting duplicate create events
+					ws.initReq.retc = nil
+
+					// send first creation event only if requested
+					if ws.initReq.createdNotify {
+						ws.outc <- *wr
+					}
+				}
+			}
+
 			nextRev = wr.Header.Revision
 			if len(wr.Events) > 0 {
 				nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
 			}
 			ws.initReq.rev = nextRev
+
+			// created event is already sent above,
+			// watcher should not post duplicate events
+			if wr.Created {
+				continue
+			}
+
+			// TODO pause channel if buffer gets too large
+			ws.buf = append(ws.buf, wr)
+
 		case <-ws.initReq.ctx.Done():
 			return
 		case <-resumec: