Ver Fonte

clientv3: use FromOutgoingContext to bucket watches

Watches were bucketed on string(ctx) for historical reasons;
metadata.FromOutgoingContext should be enough to key watches now.

Fixes #8338
Anthony Romano há 8 anos atrás
pai
commit
1c75c383a1
1 ficheiros alterados com 16 adições e 9 exclusões
  1. 16 9
      clientv3/watch.go

+ 16 - 9
clientv3/watch.go

@@ -25,6 +25,7 @@ import (
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/metadata"
 )
 
 const (
@@ -213,16 +214,15 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
 		owner:      w,
 		remote:     w.remote,
 		ctx:        ctx,
-		ctxKey:     fmt.Sprintf("%v", inctx),
+		ctxKey:     streamKeyFromCtx(inctx),
 		cancel:     cancel,
 		substreams: make(map[int64]*watcherStream),
-
-		respc:    make(chan *pb.WatchResponse),
-		reqc:     make(chan *watchRequest),
-		donec:    make(chan struct{}),
-		errc:     make(chan error, 1),
-		closingc: make(chan *watcherStream),
-		resumec:  make(chan struct{}),
+		respc:      make(chan *pb.WatchResponse),
+		reqc:       make(chan *watchRequest),
+		donec:      make(chan struct{}),
+		errc:       make(chan error, 1),
+		closingc:   make(chan *watcherStream),
+		resumec:    make(chan struct{}),
 	}
 	go wgs.run()
 	return wgs
@@ -253,7 +253,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
 	}
 
 	ok := false
-	ctxKey := fmt.Sprintf("%v", ctx)
+	ctxKey := streamKeyFromCtx(ctx)
 
 	// find or allocate appropriate grpc watch stream
 	w.mu.Lock()
@@ -794,3 +794,10 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
 	cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
 	return &pb.WatchRequest{RequestUnion: cr}
 }
+
+func streamKeyFromCtx(ctx context.Context) string {
+	if md, ok := metadata.FromOutgoingContext(ctx); ok {
+		return fmt.Sprintf("%+v", md)
+	}
+	return ""
+}