Browse Source

Merge pull request #387 from xiangli-cmu/fix_expire_notify

Fix expire notify
Xiang Li 12 years ago
parent
commit
f66bd1689d
2 changed files with 58 additions and 4 deletions
  1. 48 0
      server/v2/tests/get_handler_test.go
  2. 10 4
      store/store.go

+ 48 - 0
server/v2/tests/get_handler_test.go

@@ -173,3 +173,51 @@ func TestV2WatchKeyWithIndex(t *testing.T) {
 		assert.Equal(t, node["modifiedIndex"], 3, "")
 	})
 }
+
+// Ensures that a watcher can wait for a value to be set after a given index.
+//
+//   $ curl localhost:4001/v2/keys/keyindir/bar?wait=true
+//   $ curl -X PUT localhost:4001/v2/keys/keyindir -d dir=true -d ttl=1
+//   $ curl -X PUT localhost:4001/v2/keys/keyindir/bar -d value=YYY
+//
+func TestV2WatchKeyInDir(t *testing.T) {
+	tests.RunServer(func(s *server.Server) {
+		var body map[string]interface{}
+		c := make(chan bool)
+
+		// Set a value (before given index).
+		v := url.Values{}
+		v.Set("dir", "true")
+		v.Set("ttl", "1")
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/keyindir"), v)
+		tests.ReadBody(resp)
+
+		// Set a value (before given index).
+		v = url.Values{}
+		v.Set("value", "XXX")
+		resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/keyindir/bar"), v)
+		tests.ReadBody(resp)
+
+		go func() {
+			resp, _ := tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/keyindir/bar?wait=true"))
+			body = tests.ReadBodyJSON(resp)
+			c <- true
+		}()
+
+		// wait for expiration, we do have a up to 500 millisecond delay
+		time.Sleep(1500 * time.Millisecond)
+
+		select {
+		case <-c:
+
+		default:
+			t.Fatal("cannot get watch result")
+		}
+
+		assert.NotNil(t, body, "")
+		assert.Equal(t, body["action"], "expire", "")
+
+		node := body["node"].(map[string]interface{})
+		assert.Equal(t, node["key"], "/keyindir", "")
+	})
+}

+ 10 - 4
store/store.go

@@ -512,13 +512,19 @@ func (s *store) DeleteExpiredKeys(cutoff time.Time) {
 			break
 		}
 
-		s.ttlKeyHeap.pop()
-		node.Remove(true, true, nil)
-
 		s.CurrentIndex++
+		e := newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex)
+
+		callback := func(path string) { // notify function
+			// notify the watchers with deleted set true
+			s.WatcherHub.notifyWatchers(e, path, true)
+		}
+
+		s.ttlKeyHeap.pop()
+		node.Remove(true, true, callback)
 
 		s.Stats.Inc(ExpireCount)
-		s.WatcherHub.notify(newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex))
+		s.WatcherHub.notify(e)
 	}
 
 }