|
|
@@ -789,7 +789,7 @@ func TestStoreWatchExpireRefresh(t *testing.T) {
|
|
|
w, _ = s.Watch("/", true, false, 4)
|
|
|
fc.Advance(700 * time.Millisecond)
|
|
|
s.DeleteExpiredKeys(fc.Now())
|
|
|
- eidx = 5 // We should skip 4 because a TTL update should occur with no watch notification
|
|
|
+ eidx = 5 // We should skip 4 because a TTL update should occur with no watch notification if set `TTLOptionSet.Refresh` to true
|
|
|
assert.Equal(t, w.StartIndex(), eidx-1, "")
|
|
|
e = nbselect(w.EventChan())
|
|
|
assert.Equal(t, e.EtcdIndex, eidx, "")
|
|
|
@@ -813,7 +813,7 @@ func TestStoreWatchExpireEmptyRefresh(t *testing.T) {
|
|
|
w, _ := s.Watch("/", true, false, 2)
|
|
|
fc.Advance(700 * time.Millisecond)
|
|
|
s.DeleteExpiredKeys(fc.Now())
|
|
|
- eidx = 3 // We should skip 2 because a TTL update should occur with no watch notification
|
|
|
+ eidx = 3 // We should skip 2 because a TTL update should occur with no watch notification if set `TTLOptionSet.Refresh` to true
|
|
|
assert.Equal(t, w.StartIndex(), eidx-1, "")
|
|
|
e := nbselect(w.EventChan())
|
|
|
assert.Equal(t, e.EtcdIndex, eidx, "")
|
|
|
@@ -822,6 +822,32 @@ func TestStoreWatchExpireEmptyRefresh(t *testing.T) {
|
|
|
assert.Equal(t, *e.PrevNode.Value, "bar", "")
|
|
|
}
|
|
|
|
|
|
+// Update TTL of a key (set TTLOptionSet.Refresh to false) and send notification
|
|
|
+func TestStoreWatchNoRefresh(t *testing.T) {
|
|
|
+ s := newStore()
|
|
|
+ fc := newFakeClock()
|
|
|
+ s.clock = fc
|
|
|
+
|
|
|
+ var eidx uint64 = 1
|
|
|
+ s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true})
|
|
|
+ // Should be no-op
|
|
|
+ fc.Advance(200 * time.Millisecond)
|
|
|
+ s.DeleteExpiredKeys(fc.Now())
|
|
|
+
|
|
|
+ // Update key's TTL with setting `TTLOptionSet.Refresh` to false will cause an update event
|
|
|
+ s.Update("/foo", "", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: false})
|
|
|
+ w, _ := s.Watch("/", true, false, 2)
|
|
|
+ fc.Advance(700 * time.Millisecond)
|
|
|
+ s.DeleteExpiredKeys(fc.Now())
|
|
|
+ eidx = 2
|
|
|
+ assert.Equal(t, w.StartIndex(), eidx, "")
|
|
|
+ e := nbselect(w.EventChan())
|
|
|
+ assert.Equal(t, e.EtcdIndex, eidx, "")
|
|
|
+ assert.Equal(t, e.Action, "update", "")
|
|
|
+ assert.Equal(t, e.Node.Key, "/foo", "")
|
|
|
+ assert.Equal(t, *e.PrevNode.Value, "bar", "")
|
|
|
+}
|
|
|
+
|
|
|
// Ensure that the store can update the TTL on a value with refresh.
|
|
|
func TestStoreRefresh(t *testing.T) {
|
|
|
s := newStore()
|
|
|
@@ -1046,17 +1072,23 @@ func TestStoreWatchRecursiveCreateDeeperThanHiddenKey(t *testing.T) {
|
|
|
|
|
|
// Ensure that slow consumers are handled properly.
|
|
|
//
|
|
|
-// Since Watcher.EventChan() has a buffer of size 1 we can only queue 1
|
|
|
+// Since Watcher.EventChan() has a buffer of size 100 we can only queue 100
|
|
|
// event per watcher. If the consumer cannot consume the event on time and
|
|
|
// another event arrives, the channel is closed and event is discarded.
|
|
|
// This test ensures that after closing the channel, the store can continue
|
|
|
// to operate correctly.
|
|
|
func TestStoreWatchSlowConsumer(t *testing.T) {
|
|
|
s := newStore()
|
|
|
- s.Watch("/foo", true, true, 0) // stream must be true
|
|
|
- s.Set("/foo", false, "1", TTLOptionSet{ExpireTime: Permanent}) // ok
|
|
|
- s.Set("/foo", false, "2", TTLOptionSet{ExpireTime: Permanent}) // ok
|
|
|
- s.Set("/foo", false, "3", TTLOptionSet{ExpireTime: Permanent}) // must not panic
|
|
|
+ s.Watch("/foo", true, true, 0) // stream must be true
|
|
|
+ // Fill watch channel with 100 events
|
|
|
+ for i := 1; i <= 100; i++ {
|
|
|
+ s.Set("/foo", false, string(i), TTLOptionSet{ExpireTime: Permanent}) // ok
|
|
|
+ }
|
|
|
+ assert.Equal(t, s.WatcherHub.count, int64(1), "")
|
|
|
+ s.Set("/foo", false, "101", TTLOptionSet{ExpireTime: Permanent}) // ok
|
|
|
+ // remove watcher
|
|
|
+ assert.Equal(t, s.WatcherHub.count, int64(0), "")
|
|
|
+ s.Set("/foo", false, "102", TTLOptionSet{ExpireTime: Permanent}) // must not panic
|
|
|
}
|
|
|
|
|
|
// Performs a non-blocking select on an event channel.
|