Browse Source

concurrency: don't skip leader updates in Observe()

The Get for the leader key will fetch based on the latest revision
instead of the deletion revision, missing leader updates between
the delete and the Get.

Although it's usually safe to skip these updates since they're
stale, it makes testing more difficult and in some cases the
full leader update history is desirable.
Anthony Romano 8 years ago
parent
commit
4b4f5be74a
1 changed files with 7 additions and 3 deletions
  1. 7 3
      clientv3/concurrency/election.go

+ 7 - 3
clientv3/concurrency/election.go

@@ -161,20 +161,21 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
 	client := e.session.Client()
 
 	defer close(ch)
+	lastRev := int64(0)
 	for {
-		resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
+		opts := append(v3.WithFirstCreate(), v3.WithRev(lastRev))
+		resp, err := client.Get(ctx, e.keyPrefix, opts...)
 		if err != nil {
 			return
 		}
 
 		var kv *mvccpb.KeyValue
 
-		cctx, cancel := context.WithCancel(ctx)
 		if len(resp.Kvs) == 0 {
+			cctx, cancel := context.WithCancel(ctx)
 			// wait for first key put on prefix
 			opts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}
 			wch := client.Watch(cctx, e.keyPrefix, opts...)
-
 			for kv == nil {
 				wr, ok := <-wch
 				if !ok || wr.Err() != nil {
@@ -189,10 +190,12 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
 					}
 				}
 			}
+			cancel()
 		} else {
 			kv = resp.Kvs[0]
 		}
 
+		cctx, cancel := context.WithCancel(ctx)
 		wch := client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision))
 		keyDeleted := false
 		for !keyDeleted {
@@ -202,6 +205,7 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
 			}
 			for _, ev := range wr.Events {
 				if ev.Type == mvccpb.DELETE {
+					lastRev = ev.Kv.ModRevision
 					keyDeleted = true
 					break
 				}