|
@@ -810,6 +810,21 @@ func TestStoreWatchRecursiveCreateDeeperThanHiddenKey(t *testing.T) {
|
|
|
assert.Equal(t, e.Node.Key, "/_foo/bar/baz", "")
|
|
assert.Equal(t, e.Node.Key, "/_foo/bar/baz", "")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// Ensure that slow consumers are handled properly.
|
|
|
|
|
+//
|
|
|
|
|
+// Since Watcher.EventChan has a buffer of size 1 we can only queue 1
|
|
|
|
|
+// event per watcher. If the consumer cannot consume the event on time and
|
|
|
|
|
+// another event arrives, the channel is closed and event is discarded.
|
|
|
|
|
+// This test ensures that after closing the channel, the store can continue
|
|
|
|
|
+// to operate correctly.
|
|
|
|
|
+func TestStoreWatchSlowConsumer(t *testing.T) {
|
|
|
|
|
+ s := newStore()
|
|
|
|
|
+ s.Watch("/foo", true, true, 0) // stream must be true
|
|
|
|
|
+ s.Set("/foo", false, "1", Permanent) // ok
|
|
|
|
|
+ s.Set("/foo", false, "2", Permanent) // ok
|
|
|
|
|
+ s.Set("/foo", false, "3", Permanent) // must not panic
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// Performs a non-blocking select on an event channel.
|
|
// Performs a non-blocking select on an event channel.
|
|
|
func nbselect(c <-chan *Event) *Event {
|
|
func nbselect(c <-chan *Event) *Event {
|
|
|
select {
|
|
select {
|