watch.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package v3rpc
  15. import (
  16. "io"
  17. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  18. "github.com/coreos/etcd/storage"
  19. )
  20. type watchServer struct {
  21. watchable storage.Watchable
  22. }
  23. func NewWatchServer(w storage.Watchable) pb.WatchServer {
  24. return &watchServer{w}
  25. }
  26. func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
  27. closec := make(chan struct{})
  28. defer close(closec)
  29. watcher := ws.watchable.NewWatcher()
  30. defer watcher.Close()
  31. go sendLoop(stream, watcher, closec)
  32. for {
  33. req, err := stream.Recv()
  34. if err == io.EOF {
  35. return nil
  36. }
  37. if err != nil {
  38. return err
  39. }
  40. var prefix bool
  41. toWatch := req.Key
  42. if len(req.Key) == 0 {
  43. toWatch = req.Prefix
  44. prefix = true
  45. }
  46. // TODO: support cancellation
  47. watcher.Watch(toWatch, prefix, req.StartRevision)
  48. }
  49. }
  50. func sendLoop(stream pb.Watch_WatchServer, watcher storage.Watcher, closec chan struct{}) {
  51. for {
  52. select {
  53. case e, ok := <-watcher.Chan():
  54. if !ok {
  55. return
  56. }
  57. err := stream.Send(&pb.WatchResponse{Event: &e})
  58. storage.ReportEventReceived()
  59. if err != nil {
  60. return
  61. }
  62. case <-closec:
  63. // drain the chan to clean up pending events
  64. for {
  65. _, ok := <-watcher.Chan()
  66. if !ok {
  67. return
  68. }
  69. storage.ReportEventReceived()
  70. }
  71. }
  72. }
  73. }