remote_client.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package clientv3
  15. import (
  16. "sync"
  17. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  18. "golang.org/x/net/context"
  19. "google.golang.org/grpc"
  20. )
  21. type remoteClient struct {
  22. client *Client
  23. conn *grpc.ClientConn
  24. updateConn func(*grpc.ClientConn)
  25. mu sync.Mutex
  26. }
  27. func newRemoteClient(client *Client, update func(*grpc.ClientConn)) *remoteClient {
  28. ret := &remoteClient{
  29. client: client,
  30. conn: client.ActiveConnection(),
  31. updateConn: update,
  32. }
  33. ret.mu.Lock()
  34. defer ret.mu.Unlock()
  35. ret.updateConn(ret.conn)
  36. return ret
  37. }
  38. // reconnectWait reconnects the client, returning when connection establishes/fails.
  39. func (r *remoteClient) reconnectWait(ctx context.Context, prevErr error) error {
  40. r.mu.Lock()
  41. updated := r.tryUpdate()
  42. r.mu.Unlock()
  43. if updated {
  44. return nil
  45. }
  46. conn, err := r.client.connWait(ctx, prevErr)
  47. if err == nil {
  48. r.mu.Lock()
  49. r.conn = conn
  50. r.updateConn(conn)
  51. r.mu.Unlock()
  52. }
  53. return err
  54. }
  55. // reconnect will reconnect the client without waiting
  56. func (r *remoteClient) reconnect(err error) {
  57. r.mu.Lock()
  58. defer r.mu.Unlock()
  59. if r.tryUpdate() {
  60. return
  61. }
  62. r.client.connStartRetry(err)
  63. }
  64. func (r *remoteClient) tryUpdate() bool {
  65. activeConn := r.client.ActiveConnection()
  66. if activeConn == nil || activeConn == r.conn {
  67. return false
  68. }
  69. r.conn = activeConn
  70. r.updateConn(activeConn)
  71. return true
  72. }
  73. func (r *remoteClient) acquire(ctx context.Context) error {
  74. for {
  75. r.client.mu.RLock()
  76. closed := r.client.cancel == nil
  77. c := r.client.conn
  78. r.mu.Lock()
  79. match := r.conn == c
  80. r.mu.Unlock()
  81. if closed {
  82. return rpctypes.ErrConnClosed
  83. }
  84. if match {
  85. return nil
  86. }
  87. r.client.mu.RUnlock()
  88. if err := r.reconnectWait(ctx, nil); err != nil {
  89. return err
  90. }
  91. }
  92. }
  93. func (r *remoteClient) release() { r.client.mu.RUnlock() }