|
|
@@ -123,10 +123,9 @@ type serverHandlerTransport struct {
|
|
|
// when WriteStatus is called.
|
|
|
writes chan func()
|
|
|
|
|
|
- mu sync.Mutex
|
|
|
- // streamDone indicates whether WriteStatus has been called and writes channel
|
|
|
- // has been closed.
|
|
|
- streamDone bool
|
|
|
+ // block concurrent WriteStatus calls
|
|
|
+ // e.g. grpc/(*serverStream).SendMsg/RecvMsg
|
|
|
+ writeStatusMu sync.Mutex
|
|
|
}
|
|
|
|
|
|
func (ht *serverHandlerTransport) Close() error {
|
|
|
@@ -177,13 +176,9 @@ func (ht *serverHandlerTransport) do(fn func()) error {
|
|
|
}
|
|
|
|
|
|
func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
|
|
|
- ht.mu.Lock()
|
|
|
- if ht.streamDone {
|
|
|
- ht.mu.Unlock()
|
|
|
- return nil
|
|
|
- }
|
|
|
- ht.streamDone = true
|
|
|
- ht.mu.Unlock()
|
|
|
+ ht.writeStatusMu.Lock()
|
|
|
+ defer ht.writeStatusMu.Unlock()
|
|
|
+
|
|
|
err := ht.do(func() {
|
|
|
ht.writeCommonHeaders(s)
|
|
|
|
|
|
@@ -222,7 +217,11 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
|
|
|
}
|
|
|
}
|
|
|
})
|
|
|
- close(ht.writes)
|
|
|
+
|
|
|
+ if err == nil { // transport has not been closed
|
|
|
+ ht.Close()
|
|
|
+ close(ht.writes)
|
|
|
+ }
|
|
|
return err
|
|
|
}
|
|
|
|