Browse Source

clientv3: add send created notification

Xiang Li 9 years ago
parent
commit
33c3583b50
3 changed files with 59 additions and 8 deletions
  1. 18 0
      clientv3/integration/watch_test.go
  2. 13 0
      clientv3/op.go
  3. 28 8
      clientv3/watch.go

+ 18 - 0
clientv3/integration/watch_test.go

@@ -709,3 +709,21 @@ func TestWatchWithFilter(t *testing.T) {
 	case <-time.After(100 * time.Millisecond):
 	}
 }
+
+// TestWatchWithCreatedNotification checks that createdNotification works.
+func TestWatchWithCreatedNotification(t *testing.T) {
+	cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	defer cluster.Terminate(t)
+
+	client := cluster.RandClient()
+
+	ctx := context.Background()
+
+	createC := client.Watch(ctx, "a", clientv3.WithCreatedNotify())
+
+	resp := <-createC
+
+	if !resp.Created {
+		t.Fatalf("expected created event, got %v", resp)
+	}
+}

+ 13 - 0
clientv3/op.go

@@ -50,6 +50,8 @@ type Op struct {
 
 	// progressNotify is for progress updates.
 	progressNotify bool
+	// createdNotify is for created event
+	createdNotify bool
 	// filters for watchers
 	filterPut    bool
 	filterDelete bool
@@ -116,6 +118,8 @@ func OpDelete(key string, opts ...OpOption) Op {
 		panic("unexpected countOnly in delete")
 	case ret.filterDelete, ret.filterPut:
 		panic("unexpected filter in delete")
+	case ret.createdNotify:
+		panic("unexpected createdNotify in delete")
 	}
 	return ret
 }
@@ -138,6 +142,8 @@ func OpPut(key, val string, opts ...OpOption) Op {
 		panic("unexpected countOnly in put")
 	case ret.filterDelete, ret.filterPut:
 		panic("unexpected filter in put")
+	case ret.createdNotify:
+		panic("unexpected createdNotify in put")
 	}
 	return ret
 }
@@ -281,6 +287,13 @@ func WithProgressNotify() OpOption {
 	}
 }
 
+// WithCreatedNotify makes watch server sends the created event.
+func WithCreatedNotify() OpOption {
+	return func(op *Op) {
+		op.createdNotify = true
+	}
+}
+
 // WithFilterPut discards PUT events from the watcher.
 func WithFilterPut() OpOption {
 	return func(op *Op) { op.filterPut = true }

+ 28 - 8
clientv3/watch.go

@@ -61,6 +61,9 @@ type WatchResponse struct {
 	// the channel sends a final response that has Canceled set to true with a non-nil Err().
 	Canceled bool
 
+	// Created is used to indicate the creation of the watcher.
+	Created bool
+
 	closeErr error
 }
 
@@ -98,6 +101,7 @@ type watcher struct {
 
 	// mu protects the grpc streams map
 	mu sync.RWMutex
+
 	// streams holds all the active grpc streams keyed by ctx value.
 	streams map[string]*watchGrpcStream
 }
@@ -138,6 +142,8 @@ type watchRequest struct {
 	key string
 	end string
 	rev int64
+	// send created notification event if this field is true
+	createdNotify bool
 	// progressNotify is for progress updates
 	progressNotify bool
 	// filters is the list of events to filter out
@@ -223,6 +229,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
 
 	wr := &watchRequest{
 		ctx:            ctx,
+		createdNotify:  ow.createdNotify,
 		key:            string(ow.key),
 		end:            string(ow.end),
 		rev:            ow.rev,
@@ -418,6 +425,7 @@ func (w *watchGrpcStream) run() {
 				w.addStream(pbresp, pendingReq)
 				pendingReq = nil
 				curReqC = w.reqc
+				w.dispatchEvent(pbresp)
 			case pbresp.Canceled:
 				delete(cancelSet, pbresp.WatchId)
 				// shutdown serveStream, if any
@@ -489,19 +497,23 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
 	w.mu.RLock()
 	defer w.mu.RUnlock()
 	ws, ok := w.streams[pbresp.WatchId]
+	if !ok {
+		return false
+	}
+
 	events := make([]*Event, len(pbresp.Events))
 	for i, ev := range pbresp.Events {
 		events[i] = (*Event)(ev)
 	}
-	if ok {
-		wr := &WatchResponse{
-			Header:          *pbresp.Header,
-			Events:          events,
-			CompactRevision: pbresp.CompactRevision,
-			Canceled:        pbresp.Canceled}
-		ws.recvc <- wr
+	wr := &WatchResponse{
+		Header:          *pbresp.Header,
+		Events:          events,
+		CompactRevision: pbresp.CompactRevision,
+		Created:         pbresp.Created,
+		Canceled:        pbresp.Canceled,
 	}
-	return ok
+	ws.recvc <- wr
+	return true
 }
 
 // serveWatchClient forwards messages from the grpc stream to run()
@@ -533,6 +545,14 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
 	for !closing {
 		curWr := emptyWr
 		outc := ws.outc
+
+		// ignore created event if create notify is not requested or
+		// we already sent the initial created event (when we are on the resume path).
+		if len(wrs) > 0 && wrs[0].Created &&
+			(!ws.initReq.createdNotify || ws.lastRev != 0) {
+			wrs = wrs[1:]
+		}
+
 		if len(wrs) > 0 {
 			curWr = wrs[0]
 		} else {