Browse Source

grpcproxy: handle overloaded stream

Xiang Li 9 years ago
parent
commit
b56ee178d5
2 changed files with 7 additions and 3 deletions
  1. 1 1
      proxy/grpcproxy/watch.go
  2. 6 2
      proxy/grpcproxy/watcher.go

+ 1 - 1
proxy/grpcproxy/watch.go

@@ -67,7 +67,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
 		id:         wp.nextStreamID,
 		id:         wp.nextStreamID,
 		gRPCStream: stream,
 		gRPCStream: stream,
 
 
-		watchCh: make(chan *pb.WatchResponse, 10),
+		watchCh: make(chan *pb.WatchResponse, 1024),
 
 
 		proxyCtx: wp.ctx,
 		proxyCtx: wp.ctx,
 	}
 	}

+ 6 - 2
proxy/grpcproxy/watcher.go

@@ -15,6 +15,8 @@
 package grpcproxy
 package grpcproxy
 
 
 import (
 import (
+	"time"
+
 	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/clientv3"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/mvcc"
 	"github.com/coreos/etcd/mvcc"
@@ -86,7 +88,9 @@ func (w *watcher) send(wr clientv3.WatchResponse) {
 	}
 	}
 	select {
 	select {
 	case w.ch <- pbwr:
 	case w.ch <- pbwr:
-	default:
-		panic("handle this")
+	case <-time.After(50 * time.Millisecond):
+		// close the watch chan will notify the stream sender.
+		// the stream will gc all its watchers.
+		close(w.ch)
 	}
 	}
 }
 }