|
@@ -19,6 +19,7 @@ import (
|
|
|
|
|
|
|
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
|
"github.com/coreos/etcd/storage"
|
|
"github.com/coreos/etcd/storage"
|
|
|
|
|
+ "github.com/coreos/etcd/storage/storagepb"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
type watchServer struct {
|
|
type watchServer struct {
|
|
@@ -61,15 +62,25 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
|
|
|
func sendLoop(stream pb.Watch_WatchServer, watcher storage.Watcher, closec chan struct{}) {
|
|
func sendLoop(stream pb.Watch_WatchServer, watcher storage.Watcher, closec chan struct{}) {
|
|
|
for {
|
|
for {
|
|
|
select {
|
|
select {
|
|
|
- case e, ok := <-watcher.Chan():
|
|
|
|
|
|
|
+ case evs, ok := <-watcher.Chan():
|
|
|
if !ok {
|
|
if !ok {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
- err := stream.Send(&pb.WatchResponse{Event: &e})
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // TODO: evs is []storagepb.Event type
|
|
|
|
|
+ // either return []*storagepb.Event from storage package
|
|
|
|
|
+ // or define protocol buffer with []storagepb.Event.
|
|
|
|
|
+ events := make([]*storagepb.Event, len(evs))
|
|
|
|
|
+ for i := range evs {
|
|
|
|
|
+ events[i] = &evs[i]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ err := stream.Send(&pb.WatchResponse{Events: events})
|
|
|
storage.ReportEventReceived()
|
|
storage.ReportEventReceived()
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
case <-closec:
|
|
case <-closec:
|
|
|
// drain the chan to clean up pending events
|
|
// drain the chan to clean up pending events
|
|
|
for {
|
|
for {
|