chan_stream.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package adapter
  15. import (
  16. "context"
  17. "google.golang.org/grpc"
  18. "google.golang.org/grpc/codes"
  19. "google.golang.org/grpc/metadata"
  20. "google.golang.org/grpc/status"
  21. )
  22. // chanServerStream implements grpc.ServerStream with a chanStream
  23. type chanServerStream struct {
  24. headerc chan<- metadata.MD
  25. trailerc chan<- metadata.MD
  26. grpc.Stream
  27. headers []metadata.MD
  28. }
  29. func (ss *chanServerStream) SendHeader(md metadata.MD) error {
  30. if ss.headerc == nil {
  31. return errAlreadySentHeader
  32. }
  33. outmd := make(map[string][]string)
  34. for _, h := range append(ss.headers, md) {
  35. for k, v := range h {
  36. outmd[k] = v
  37. }
  38. }
  39. select {
  40. case ss.headerc <- outmd:
  41. ss.headerc = nil
  42. ss.headers = nil
  43. return nil
  44. case <-ss.Context().Done():
  45. }
  46. return ss.Context().Err()
  47. }
  48. func (ss *chanServerStream) SetHeader(md metadata.MD) error {
  49. if ss.headerc == nil {
  50. return errAlreadySentHeader
  51. }
  52. ss.headers = append(ss.headers, md)
  53. return nil
  54. }
  55. func (ss *chanServerStream) SetTrailer(md metadata.MD) {
  56. ss.trailerc <- md
  57. }
  58. // chanClientStream implements grpc.ClientStream with a chanStream
  59. type chanClientStream struct {
  60. headerc <-chan metadata.MD
  61. trailerc <-chan metadata.MD
  62. *chanStream
  63. }
  64. func (cs *chanClientStream) Header() (metadata.MD, error) {
  65. select {
  66. case md := <-cs.headerc:
  67. return md, nil
  68. case <-cs.Context().Done():
  69. }
  70. return nil, cs.Context().Err()
  71. }
  72. func (cs *chanClientStream) Trailer() metadata.MD {
  73. select {
  74. case md := <-cs.trailerc:
  75. return md
  76. case <-cs.Context().Done():
  77. return nil
  78. }
  79. }
  80. func (cs *chanClientStream) CloseSend() error {
  81. close(cs.chanStream.sendc)
  82. return nil
  83. }
  84. // chanStream implements grpc.Stream using channels
  85. type chanStream struct {
  86. recvc <-chan interface{}
  87. sendc chan<- interface{}
  88. ctx context.Context
  89. cancel context.CancelFunc
  90. }
  91. func (s *chanStream) Context() context.Context { return s.ctx }
  92. func (s *chanStream) SendMsg(m interface{}) error {
  93. select {
  94. case s.sendc <- m:
  95. if err, ok := m.(error); ok {
  96. return err
  97. }
  98. return nil
  99. case <-s.ctx.Done():
  100. }
  101. return s.ctx.Err()
  102. }
  103. func (s *chanStream) RecvMsg(m interface{}) error {
  104. v := m.(*interface{})
  105. for {
  106. select {
  107. case msg, ok := <-s.recvc:
  108. if !ok {
  109. return status.Error(codes.Canceled, "the client connection is closing")
  110. }
  111. if err, ok := msg.(error); ok {
  112. return err
  113. }
  114. *v = msg
  115. return nil
  116. case <-s.ctx.Done():
  117. }
  118. if len(s.recvc) == 0 {
  119. // prioritize any pending recv messages over canceled context
  120. break
  121. }
  122. }
  123. return s.ctx.Err()
  124. }
  125. func newPipeStream(ctx context.Context, ssHandler func(chanServerStream) error) chanClientStream {
  126. // ch1 is buffered so server can send error on close
  127. ch1, ch2 := make(chan interface{}, 1), make(chan interface{})
  128. headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
  129. cctx, ccancel := context.WithCancel(ctx)
  130. cli := &chanStream{recvc: ch1, sendc: ch2, ctx: cctx, cancel: ccancel}
  131. cs := chanClientStream{headerc, trailerc, cli}
  132. sctx, scancel := context.WithCancel(ctx)
  133. srv := &chanStream{recvc: ch2, sendc: ch1, ctx: sctx, cancel: scancel}
  134. ss := chanServerStream{headerc, trailerc, srv, nil}
  135. go func() {
  136. if err := ssHandler(ss); err != nil {
  137. select {
  138. case srv.sendc <- err:
  139. case <-sctx.Done():
  140. case <-cctx.Done():
  141. }
  142. }
  143. scancel()
  144. ccancel()
  145. }()
  146. return cs
  147. }