watch.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package clientv3
  15. import (
  16. "fmt"
  17. "sync"
  18. "time"
  19. v3rpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  20. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  21. mvccpb "github.com/coreos/etcd/mvcc/mvccpb"
  22. "golang.org/x/net/context"
  23. "google.golang.org/grpc"
  24. "google.golang.org/grpc/codes"
  25. )
  26. const (
  27. EventTypeDelete = mvccpb.DELETE
  28. EventTypePut = mvccpb.PUT
  29. closeSendErrTimeout = 250 * time.Millisecond
  30. )
  31. type Event mvccpb.Event
  32. type WatchChan <-chan WatchResponse
  33. type Watcher interface {
  34. // Watch watches on a key or prefix. The watched events will be returned
  35. // through the returned channel. If revisions waiting to be sent over the
  36. // watch are compacted, then the watch will be canceled by the server, the
  37. // client will post a compacted error watch response, and the channel will close.
  38. Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
  39. // Close closes the watcher and cancels all watch requests.
  40. Close() error
  41. }
  42. type WatchResponse struct {
  43. Header pb.ResponseHeader
  44. Events []*Event
  45. // CompactRevision is the minimum revision the watcher may receive.
  46. CompactRevision int64
  47. // Canceled is used to indicate watch failure.
  48. // If the watch failed and the stream was about to close, before the channel is closed,
  49. // the channel sends a final response that has Canceled set to true with a non-nil Err().
  50. Canceled bool
  51. // Created is used to indicate the creation of the watcher.
  52. Created bool
  53. closeErr error
  54. // cancelReason is a reason of canceling watch
  55. cancelReason string
  56. }
  57. // IsCreate returns true if the event tells that the key is newly created.
  58. func (e *Event) IsCreate() bool {
  59. return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision
  60. }
  61. // IsModify returns true if the event tells that a new value is put on existing key.
  62. func (e *Event) IsModify() bool {
  63. return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision
  64. }
  65. // Err is the error value if this WatchResponse holds an error.
  66. func (wr *WatchResponse) Err() error {
  67. switch {
  68. case wr.closeErr != nil:
  69. return v3rpc.Error(wr.closeErr)
  70. case wr.CompactRevision != 0:
  71. return v3rpc.ErrCompacted
  72. case wr.Canceled:
  73. if len(wr.cancelReason) != 0 {
  74. return v3rpc.Error(grpc.Errorf(codes.FailedPrecondition, "%s", wr.cancelReason))
  75. }
  76. return v3rpc.ErrFutureRev
  77. }
  78. return nil
  79. }
  80. // IsProgressNotify returns true if the WatchResponse is progress notification.
  81. func (wr *WatchResponse) IsProgressNotify() bool {
  82. return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0
  83. }
  84. // watcher implements the Watcher interface
  85. type watcher struct {
  86. remote pb.WatchClient
  87. // mu protects the grpc streams map
  88. mu sync.RWMutex
  89. // streams holds all the active grpc streams keyed by ctx value.
  90. streams map[string]*watchGrpcStream
  91. }
  92. // watchGrpcStream tracks all watch resources attached to a single grpc stream.
  93. type watchGrpcStream struct {
  94. owner *watcher
  95. remote pb.WatchClient
  96. // ctx controls internal remote.Watch requests
  97. ctx context.Context
  98. // ctxKey is the key used when looking up this stream's context
  99. ctxKey string
  100. cancel context.CancelFunc
  101. // substreams holds all active watchers on this grpc stream
  102. substreams map[int64]*watcherStream
  103. // resuming holds all resuming watchers on this grpc stream
  104. resuming []*watcherStream
  105. // reqc sends a watch request from Watch() to the main goroutine
  106. reqc chan *watchRequest
  107. // respc receives data from the watch client
  108. respc chan *pb.WatchResponse
  109. // donec closes to broadcast shutdown
  110. donec chan struct{}
  111. // errc transmits errors from grpc Recv to the watch stream reconn logic
  112. errc chan error
  113. // closingc gets the watcherStream of closing watchers
  114. closingc chan *watcherStream
  115. // wg is Done when all substream goroutines have exited
  116. wg sync.WaitGroup
  117. // resumec closes to signal that all substreams should begin resuming
  118. resumec chan struct{}
  119. // closeErr is the error that closed the watch stream
  120. closeErr error
  121. }
  122. // watchRequest is issued by the subscriber to start a new watcher
  123. type watchRequest struct {
  124. ctx context.Context
  125. key string
  126. end string
  127. rev int64
  128. // send created notification event if this field is true
  129. createdNotify bool
  130. // progressNotify is for progress updates
  131. progressNotify bool
  132. // filters is the list of events to filter out
  133. filters []pb.WatchCreateRequest_FilterType
  134. // get the previous key-value pair before the event happens
  135. prevKV bool
  136. // retc receives a chan WatchResponse once the watcher is established
  137. retc chan chan WatchResponse
  138. }
  139. // watcherStream represents a registered watcher
  140. type watcherStream struct {
  141. // initReq is the request that initiated this request
  142. initReq watchRequest
  143. // outc publishes watch responses to subscriber
  144. outc chan WatchResponse
  145. // recvc buffers watch responses before publishing
  146. recvc chan *WatchResponse
  147. // donec closes when the watcherStream goroutine stops.
  148. donec chan struct{}
  149. // closing is set to true when stream should be scheduled to shutdown.
  150. closing bool
  151. // id is the registered watch id on the grpc stream
  152. id int64
  153. // buf holds all events received from etcd but not yet consumed by the client
  154. buf []*WatchResponse
  155. }
  156. func NewWatcher(c *Client) Watcher {
  157. return NewWatchFromWatchClient(pb.NewWatchClient(c.conn))
  158. }
  159. func NewWatchFromWatchClient(wc pb.WatchClient) Watcher {
  160. return &watcher{
  161. remote: wc,
  162. streams: make(map[string]*watchGrpcStream),
  163. }
  164. }
  165. // never closes
  166. var valCtxCh = make(chan struct{})
  167. var zeroTime = time.Unix(0, 0)
  168. // ctx with only the values; never Done
  169. type valCtx struct{ context.Context }
  170. func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
  171. func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
  172. func (vc *valCtx) Err() error { return nil }
  173. func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
  174. ctx, cancel := context.WithCancel(&valCtx{inctx})
  175. wgs := &watchGrpcStream{
  176. owner: w,
  177. remote: w.remote,
  178. ctx: ctx,
  179. ctxKey: fmt.Sprintf("%v", inctx),
  180. cancel: cancel,
  181. substreams: make(map[int64]*watcherStream),
  182. respc: make(chan *pb.WatchResponse),
  183. reqc: make(chan *watchRequest),
  184. donec: make(chan struct{}),
  185. errc: make(chan error, 1),
  186. closingc: make(chan *watcherStream),
  187. resumec: make(chan struct{}),
  188. }
  189. go wgs.run()
  190. return wgs
  191. }
  192. // Watch posts a watch request to run() and waits for a new watcher channel
  193. func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
  194. ow := opWatch(key, opts...)
  195. var filters []pb.WatchCreateRequest_FilterType
  196. if ow.filterPut {
  197. filters = append(filters, pb.WatchCreateRequest_NOPUT)
  198. }
  199. if ow.filterDelete {
  200. filters = append(filters, pb.WatchCreateRequest_NODELETE)
  201. }
  202. wr := &watchRequest{
  203. ctx: ctx,
  204. createdNotify: ow.createdNotify,
  205. key: string(ow.key),
  206. end: string(ow.end),
  207. rev: ow.rev,
  208. progressNotify: ow.progressNotify,
  209. filters: filters,
  210. prevKV: ow.prevKV,
  211. retc: make(chan chan WatchResponse, 1),
  212. }
  213. ok := false
  214. ctxKey := fmt.Sprintf("%v", ctx)
  215. // find or allocate appropriate grpc watch stream
  216. w.mu.Lock()
  217. if w.streams == nil {
  218. // closed
  219. w.mu.Unlock()
  220. ch := make(chan WatchResponse)
  221. close(ch)
  222. return ch
  223. }
  224. wgs := w.streams[ctxKey]
  225. if wgs == nil {
  226. wgs = w.newWatcherGrpcStream(ctx)
  227. w.streams[ctxKey] = wgs
  228. }
  229. donec := wgs.donec
  230. reqc := wgs.reqc
  231. w.mu.Unlock()
  232. // couldn't create channel; return closed channel
  233. closeCh := make(chan WatchResponse, 1)
  234. // submit request
  235. select {
  236. case reqc <- wr:
  237. ok = true
  238. case <-wr.ctx.Done():
  239. case <-donec:
  240. if wgs.closeErr != nil {
  241. closeCh <- WatchResponse{closeErr: wgs.closeErr}
  242. break
  243. }
  244. // retry; may have dropped stream from no ctxs
  245. return w.Watch(ctx, key, opts...)
  246. }
  247. // receive channel
  248. if ok {
  249. select {
  250. case ret := <-wr.retc:
  251. return ret
  252. case <-ctx.Done():
  253. case <-donec:
  254. if wgs.closeErr != nil {
  255. closeCh <- WatchResponse{closeErr: wgs.closeErr}
  256. break
  257. }
  258. // retry; may have dropped stream from no ctxs
  259. return w.Watch(ctx, key, opts...)
  260. }
  261. }
  262. close(closeCh)
  263. return closeCh
  264. }
  265. func (w *watcher) Close() (err error) {
  266. w.mu.Lock()
  267. streams := w.streams
  268. w.streams = nil
  269. w.mu.Unlock()
  270. for _, wgs := range streams {
  271. if werr := wgs.close(); werr != nil {
  272. err = werr
  273. }
  274. }
  275. return err
  276. }
  277. func (w *watchGrpcStream) close() (err error) {
  278. w.cancel()
  279. <-w.donec
  280. select {
  281. case err = <-w.errc:
  282. default:
  283. }
  284. return toErr(w.ctx, err)
  285. }
  286. func (w *watcher) closeStream(wgs *watchGrpcStream) {
  287. w.mu.Lock()
  288. close(wgs.donec)
  289. wgs.cancel()
  290. if w.streams != nil {
  291. delete(w.streams, wgs.ctxKey)
  292. }
  293. w.mu.Unlock()
  294. }
  295. func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
  296. if resp.WatchId == -1 {
  297. // failed; no channel
  298. close(ws.recvc)
  299. return
  300. }
  301. ws.id = resp.WatchId
  302. w.substreams[ws.id] = ws
  303. }
  304. func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
  305. select {
  306. case ws.outc <- *resp:
  307. case <-ws.initReq.ctx.Done():
  308. case <-time.After(closeSendErrTimeout):
  309. }
  310. close(ws.outc)
  311. }
  312. func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
  313. // send channel response in case stream was never established
  314. select {
  315. case ws.initReq.retc <- ws.outc:
  316. default:
  317. }
  318. // close subscriber's channel
  319. if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
  320. go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
  321. } else if ws.outc != nil {
  322. close(ws.outc)
  323. }
  324. if ws.id != -1 {
  325. delete(w.substreams, ws.id)
  326. return
  327. }
  328. for i := range w.resuming {
  329. if w.resuming[i] == ws {
  330. w.resuming[i] = nil
  331. return
  332. }
  333. }
  334. }
  335. // run is the root of the goroutines for managing a watcher client
  336. func (w *watchGrpcStream) run() {
  337. var wc pb.Watch_WatchClient
  338. var closeErr error
  339. // substreams marked to close but goroutine still running; needed for
  340. // avoiding double-closing recvc on grpc stream teardown
  341. closing := make(map[*watcherStream]struct{})
  342. defer func() {
  343. w.closeErr = closeErr
  344. // shutdown substreams and resuming substreams
  345. for _, ws := range w.substreams {
  346. if _, ok := closing[ws]; !ok {
  347. close(ws.recvc)
  348. closing[ws] = struct{}{}
  349. }
  350. }
  351. for _, ws := range w.resuming {
  352. if _, ok := closing[ws]; ws != nil && !ok {
  353. close(ws.recvc)
  354. closing[ws] = struct{}{}
  355. }
  356. }
  357. w.joinSubstreams()
  358. for range closing {
  359. w.closeSubstream(<-w.closingc)
  360. }
  361. w.wg.Wait()
  362. w.owner.closeStream(w)
  363. }()
  364. // start a stream with the etcd grpc server
  365. if wc, closeErr = w.newWatchClient(); closeErr != nil {
  366. return
  367. }
  368. cancelSet := make(map[int64]struct{})
  369. for {
  370. select {
  371. // Watch() requested
  372. case wreq := <-w.reqc:
  373. outc := make(chan WatchResponse, 1)
  374. ws := &watcherStream{
  375. initReq: *wreq,
  376. id: -1,
  377. outc: outc,
  378. // unbufffered so resumes won't cause repeat events
  379. recvc: make(chan *WatchResponse),
  380. }
  381. ws.donec = make(chan struct{})
  382. w.wg.Add(1)
  383. go w.serveSubstream(ws, w.resumec)
  384. // queue up for watcher creation/resume
  385. w.resuming = append(w.resuming, ws)
  386. if len(w.resuming) == 1 {
  387. // head of resume queue, can register a new watcher
  388. wc.Send(ws.initReq.toPB())
  389. }
  390. // New events from the watch client
  391. case pbresp := <-w.respc:
  392. switch {
  393. case pbresp.Created:
  394. // response to head of queue creation
  395. if ws := w.resuming[0]; ws != nil {
  396. w.addSubstream(pbresp, ws)
  397. w.dispatchEvent(pbresp)
  398. w.resuming[0] = nil
  399. }
  400. if ws := w.nextResume(); ws != nil {
  401. wc.Send(ws.initReq.toPB())
  402. }
  403. case pbresp.Canceled && pbresp.CompactRevision == 0:
  404. delete(cancelSet, pbresp.WatchId)
  405. if ws, ok := w.substreams[pbresp.WatchId]; ok {
  406. // signal to stream goroutine to update closingc
  407. close(ws.recvc)
  408. closing[ws] = struct{}{}
  409. }
  410. default:
  411. // dispatch to appropriate watch stream
  412. if ok := w.dispatchEvent(pbresp); ok {
  413. break
  414. }
  415. // watch response on unexpected watch id; cancel id
  416. if _, ok := cancelSet[pbresp.WatchId]; ok {
  417. break
  418. }
  419. cancelSet[pbresp.WatchId] = struct{}{}
  420. cr := &pb.WatchRequest_CancelRequest{
  421. CancelRequest: &pb.WatchCancelRequest{
  422. WatchId: pbresp.WatchId,
  423. },
  424. }
  425. req := &pb.WatchRequest{RequestUnion: cr}
  426. wc.Send(req)
  427. }
  428. // watch client failed to recv; spawn another if possible
  429. case err := <-w.errc:
  430. if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
  431. closeErr = err
  432. return
  433. }
  434. if wc, closeErr = w.newWatchClient(); closeErr != nil {
  435. return
  436. }
  437. if ws := w.nextResume(); ws != nil {
  438. wc.Send(ws.initReq.toPB())
  439. }
  440. cancelSet = make(map[int64]struct{})
  441. case <-w.ctx.Done():
  442. return
  443. case ws := <-w.closingc:
  444. w.closeSubstream(ws)
  445. delete(closing, ws)
  446. if len(w.substreams)+len(w.resuming) == 0 {
  447. // no more watchers on this stream, shutdown
  448. return
  449. }
  450. }
  451. }
  452. }
  453. // nextResume chooses the next resuming to register with the grpc stream. Abandoned
  454. // streams are marked as nil in the queue since the head must wait for its inflight registration.
  455. func (w *watchGrpcStream) nextResume() *watcherStream {
  456. for len(w.resuming) != 0 {
  457. if w.resuming[0] != nil {
  458. return w.resuming[0]
  459. }
  460. w.resuming = w.resuming[1:len(w.resuming)]
  461. }
  462. return nil
  463. }
  464. // dispatchEvent sends a WatchResponse to the appropriate watcher stream
  465. func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
  466. events := make([]*Event, len(pbresp.Events))
  467. for i, ev := range pbresp.Events {
  468. events[i] = (*Event)(ev)
  469. }
  470. wr := &WatchResponse{
  471. Header: *pbresp.Header,
  472. Events: events,
  473. CompactRevision: pbresp.CompactRevision,
  474. Created: pbresp.Created,
  475. Canceled: pbresp.Canceled,
  476. cancelReason: pbresp.CancelReason,
  477. }
  478. ws, ok := w.substreams[pbresp.WatchId]
  479. if !ok {
  480. return false
  481. }
  482. select {
  483. case ws.recvc <- wr:
  484. case <-ws.donec:
  485. return false
  486. }
  487. return true
  488. }
  489. // serveWatchClient forwards messages from the grpc stream to run()
  490. func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
  491. for {
  492. resp, err := wc.Recv()
  493. if err != nil {
  494. select {
  495. case w.errc <- err:
  496. case <-w.donec:
  497. }
  498. return
  499. }
  500. select {
  501. case w.respc <- resp:
  502. case <-w.donec:
  503. return
  504. }
  505. }
  506. }
  507. // serveSubstream forwards watch responses from run() to the subscriber
  508. func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
  509. if ws.closing {
  510. panic("created substream goroutine but substream is closing")
  511. }
  512. // nextRev is the minimum expected next revision
  513. nextRev := ws.initReq.rev
  514. resuming := false
  515. defer func() {
  516. if !resuming {
  517. ws.closing = true
  518. }
  519. close(ws.donec)
  520. if !resuming {
  521. w.closingc <- ws
  522. }
  523. w.wg.Done()
  524. }()
  525. emptyWr := &WatchResponse{}
  526. for {
  527. curWr := emptyWr
  528. outc := ws.outc
  529. if len(ws.buf) > 0 {
  530. curWr = ws.buf[0]
  531. } else {
  532. outc = nil
  533. }
  534. select {
  535. case outc <- *curWr:
  536. if ws.buf[0].Err() != nil {
  537. return
  538. }
  539. ws.buf[0] = nil
  540. ws.buf = ws.buf[1:]
  541. case wr, ok := <-ws.recvc:
  542. if !ok {
  543. // shutdown from closeSubstream
  544. return
  545. }
  546. if wr.Created {
  547. if ws.initReq.retc != nil {
  548. ws.initReq.retc <- ws.outc
  549. // to prevent next write from taking the slot in buffered channel
  550. // and posting duplicate create events
  551. ws.initReq.retc = nil
  552. // send first creation event only if requested
  553. if ws.initReq.createdNotify {
  554. ws.outc <- *wr
  555. }
  556. // once the watch channel is returned, a current revision
  557. // watch must resume at the store revision. This is necessary
  558. // for the following case to work as expected:
  559. // wch := m1.Watch("a")
  560. // m2.Put("a", "b")
  561. // <-wch
  562. // If the revision is only bound on the first observed event,
  563. // if wch is disconnected before the Put is issued, then reconnects
  564. // after it is committed, it'll miss the Put.
  565. if ws.initReq.rev == 0 {
  566. nextRev = wr.Header.Revision
  567. }
  568. }
  569. } else {
  570. // current progress of watch; <= store revision
  571. nextRev = wr.Header.Revision
  572. }
  573. if len(wr.Events) > 0 {
  574. nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
  575. }
  576. ws.initReq.rev = nextRev
  577. // created event is already sent above,
  578. // watcher should not post duplicate events
  579. if wr.Created {
  580. continue
  581. }
  582. // TODO pause channel if buffer gets too large
  583. ws.buf = append(ws.buf, wr)
  584. case <-w.ctx.Done():
  585. return
  586. case <-ws.initReq.ctx.Done():
  587. return
  588. case <-resumec:
  589. resuming = true
  590. return
  591. }
  592. }
  593. // lazily send cancel message if events on missing id
  594. }
  595. func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
  596. // mark all substreams as resuming
  597. close(w.resumec)
  598. w.resumec = make(chan struct{})
  599. w.joinSubstreams()
  600. for _, ws := range w.substreams {
  601. ws.id = -1
  602. w.resuming = append(w.resuming, ws)
  603. }
  604. // strip out nils, if any
  605. var resuming []*watcherStream
  606. for _, ws := range w.resuming {
  607. if ws != nil {
  608. resuming = append(resuming, ws)
  609. }
  610. }
  611. w.resuming = resuming
  612. w.substreams = make(map[int64]*watcherStream)
  613. // connect to grpc stream while accepting watcher cancelation
  614. stopc := make(chan struct{})
  615. donec := w.waitCancelSubstreams(stopc)
  616. wc, err := w.openWatchClient()
  617. close(stopc)
  618. <-donec
  619. // serve all non-closing streams, even if there's a client error
  620. // so that the teardown path can shutdown the streams as expected.
  621. for _, ws := range w.resuming {
  622. if ws.closing {
  623. continue
  624. }
  625. ws.donec = make(chan struct{})
  626. w.wg.Add(1)
  627. go w.serveSubstream(ws, w.resumec)
  628. }
  629. if err != nil {
  630. return nil, v3rpc.Error(err)
  631. }
  632. // receive data from new grpc stream
  633. go w.serveWatchClient(wc)
  634. return wc, nil
  635. }
  636. func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
  637. var wg sync.WaitGroup
  638. wg.Add(len(w.resuming))
  639. donec := make(chan struct{})
  640. for i := range w.resuming {
  641. go func(ws *watcherStream) {
  642. defer wg.Done()
  643. if ws.closing {
  644. if ws.initReq.ctx.Err() != nil && ws.outc != nil {
  645. close(ws.outc)
  646. ws.outc = nil
  647. }
  648. return
  649. }
  650. select {
  651. case <-ws.initReq.ctx.Done():
  652. // closed ws will be removed from resuming
  653. ws.closing = true
  654. close(ws.outc)
  655. ws.outc = nil
  656. w.wg.Add(1)
  657. go func() {
  658. defer w.wg.Done()
  659. w.closingc <- ws
  660. }()
  661. case <-stopc:
  662. }
  663. }(w.resuming[i])
  664. }
  665. go func() {
  666. defer close(donec)
  667. wg.Wait()
  668. }()
  669. return donec
  670. }
  671. // joinSubstream waits for all substream goroutines to complete
  672. func (w *watchGrpcStream) joinSubstreams() {
  673. for _, ws := range w.substreams {
  674. <-ws.donec
  675. }
  676. for _, ws := range w.resuming {
  677. if ws != nil {
  678. <-ws.donec
  679. }
  680. }
  681. }
  682. // openWatchClient retries opening a watchclient until retryConnection fails
  683. func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
  684. for {
  685. select {
  686. case <-w.ctx.Done():
  687. if err == nil {
  688. return nil, w.ctx.Err()
  689. }
  690. return nil, err
  691. default:
  692. }
  693. if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil {
  694. break
  695. }
  696. if isHaltErr(w.ctx, err) {
  697. return nil, v3rpc.Error(err)
  698. }
  699. }
  700. return ws, nil
  701. }
  702. // toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
  703. func (wr *watchRequest) toPB() *pb.WatchRequest {
  704. req := &pb.WatchCreateRequest{
  705. StartRevision: wr.rev,
  706. Key: []byte(wr.key),
  707. RangeEnd: []byte(wr.end),
  708. ProgressNotify: wr.progressNotify,
  709. Filters: wr.filters,
  710. PrevKv: wr.prevKV,
  711. }
  712. cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
  713. return &pb.WatchRequest{RequestUnion: cr}
  714. }