watcher.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package grpcproxy
  15. import (
  16. "github.com/coreos/etcd/clientv3"
  17. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  18. "github.com/coreos/etcd/mvcc"
  19. "github.com/coreos/etcd/mvcc/mvccpb"
  20. )
  21. type watchRange struct {
  22. key, end string
  23. }
  24. type watcher struct {
  25. id int64
  26. wr watchRange
  27. rev int64
  28. filters []mvcc.FilterFunc
  29. progress bool
  30. ch chan<- *pb.WatchResponse
  31. }
  32. func (w *watcher) send(wr clientv3.WatchResponse) {
  33. if wr.IsProgressNotify() && !w.progress {
  34. return
  35. }
  36. events := make([]*mvccpb.Event, 0, len(wr.Events))
  37. for i := range wr.Events {
  38. ev := (*mvccpb.Event)(wr.Events[i])
  39. if ev.Kv.ModRevision <= w.rev {
  40. continue
  41. } else {
  42. w.rev = ev.Kv.ModRevision
  43. }
  44. filtered := false
  45. if len(w.filters) != 0 {
  46. for _, filter := range w.filters {
  47. if filter(*ev) {
  48. filtered = true
  49. break
  50. }
  51. }
  52. }
  53. if !filtered {
  54. events = append(events, ev)
  55. }
  56. }
  57. // all events are filtered out?
  58. if !wr.IsProgressNotify() && !wr.Created && len(events) == 0 {
  59. return
  60. }
  61. pbwr := &pb.WatchResponse{
  62. Header: &wr.Header,
  63. Created: wr.Created,
  64. WatchId: w.id,
  65. Events: events,
  66. }
  67. select {
  68. case w.ch <- pbwr:
  69. default:
  70. panic("handle this")
  71. }
  72. }