client.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. // Copyright 2016 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package clientv3
  15. import (
  16. "net"
  17. "net/url"
  18. "sync"
  19. "time"
  20. "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
  21. "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/codes"
  22. "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/credentials"
  23. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  24. "github.com/coreos/etcd/pkg/transport"
  25. )
  26. // Client provides and manages an etcd v3 client session.
  27. type Client struct {
  28. // KV is the keyvalue API for the client's connection.
  29. KV pb.KVClient
  30. // Lease is the lease API for the client's connection.
  31. Lease pb.LeaseClient
  32. // Watch is the watch API for the client's connection.
  33. Watch pb.WatchClient
  34. // Cluster is the cluster API for the client's connection.
  35. Cluster pb.ClusterClient
  36. conn *grpc.ClientConn
  37. cfg Config
  38. creds *credentials.TransportAuthenticator
  39. mu sync.RWMutex // protects connection selection and error list
  40. errors []error // errors passed to retryConnection
  41. }
  42. // EndpointDialer is a policy for choosing which endpoint to dial next
  43. type EndpointDialer func(*Client) (*grpc.ClientConn, error)
  44. type Config struct {
  45. // Endpoints is a list of URLs
  46. Endpoints []string
  47. // RetryDialer chooses the next endpoint to use
  48. RetryDialer EndpointDialer
  49. // DialTimeout is the timeout for failing to establish a connection.
  50. DialTimeout time.Duration
  51. // TLS holds the client secure credentials, if any.
  52. TLS *transport.TLSInfo
  53. }
  54. // New creates a new etcdv3 client from a given configuration.
  55. func New(cfg Config) (*Client, error) {
  56. if cfg.RetryDialer == nil {
  57. cfg.RetryDialer = dialEndpointList
  58. }
  59. return newClient(&cfg)
  60. }
  61. // NewFromURL creates a new etcdv3 client from a URL.
  62. func NewFromURL(url string) (*Client, error) {
  63. return New(Config{Endpoints: []string{url}})
  64. }
  65. // Close shuts down the client's etcd connections.
  66. func (c *Client) Close() error {
  67. return c.conn.Close()
  68. }
  69. // Endpoints lists the registered endpoints for the client.
  70. func (c *Client) Endpoints() []string { return c.cfg.Endpoints }
  71. // Errors returns all errors that have been observed since called last.
  72. func (c *Client) Errors() (errs []error) {
  73. c.mu.Lock()
  74. defer c.mu.Unlock()
  75. errs = c.errors
  76. c.errors = nil
  77. return errs
  78. }
  79. // Dial establishes a connection for a given endpoint using the client's config
  80. func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) {
  81. opts := []grpc.DialOption{
  82. grpc.WithBlock(),
  83. grpc.WithTimeout(c.cfg.DialTimeout),
  84. }
  85. if c.creds != nil {
  86. opts = append(opts, grpc.WithTransportCredentials(*c.creds))
  87. } else {
  88. opts = append(opts, grpc.WithInsecure())
  89. }
  90. if url, uerr := url.Parse(endpoint); uerr == nil && url.Scheme == "unix" {
  91. f := func(a string, t time.Duration) (net.Conn, error) {
  92. return net.DialTimeout("unix", a, t)
  93. }
  94. // strip unix:// prefix so certs work
  95. endpoint = url.Host
  96. opts = append(opts, grpc.WithDialer(f))
  97. }
  98. conn, err := grpc.Dial(endpoint, opts...)
  99. if err != nil {
  100. return nil, err
  101. }
  102. return conn, nil
  103. }
  104. func newClient(cfg *Config) (*Client, error) {
  105. if cfg == nil {
  106. cfg = &Config{RetryDialer: dialEndpointList}
  107. }
  108. var creds *credentials.TransportAuthenticator
  109. if cfg.TLS != nil {
  110. tlscfg, err := cfg.TLS.ClientConfig()
  111. if err != nil {
  112. return nil, err
  113. }
  114. c := credentials.NewTLS(tlscfg)
  115. creds = &c
  116. }
  117. // use a temporary skeleton client to bootstrap first connection
  118. conn, err := cfg.RetryDialer(&Client{cfg: *cfg, creds: creds})
  119. if err != nil {
  120. return nil, err
  121. }
  122. return &Client{
  123. KV: pb.NewKVClient(conn),
  124. Lease: pb.NewLeaseClient(conn),
  125. Watch: pb.NewWatchClient(conn),
  126. Cluster: pb.NewClusterClient(conn),
  127. conn: conn,
  128. cfg: *cfg,
  129. creds: creds,
  130. }, nil
  131. }
  132. // activeConnection returns the current in-use connection
  133. func (c *Client) ActiveConnection() *grpc.ClientConn {
  134. c.mu.RLock()
  135. defer c.mu.RUnlock()
  136. return c.conn
  137. }
  138. // retryConnection establishes a new connection
  139. func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.ClientConn, error) {
  140. c.mu.Lock()
  141. defer c.mu.Unlock()
  142. if err != nil {
  143. c.errors = append(c.errors, err)
  144. }
  145. if oldConn != c.conn {
  146. // conn has already been updated
  147. return c.conn, nil
  148. }
  149. conn, dialErr := c.cfg.RetryDialer(c)
  150. if dialErr != nil {
  151. c.errors = append(c.errors, dialErr)
  152. return nil, dialErr
  153. }
  154. c.conn = conn
  155. return c.conn, nil
  156. }
  157. // dialEndpointList attempts to connect to each endpoint in order until a
  158. // connection is established.
  159. func dialEndpointList(c *Client) (*grpc.ClientConn, error) {
  160. var err error
  161. for _, ep := range c.Endpoints() {
  162. conn, curErr := c.Dial(ep)
  163. if curErr != nil {
  164. err = curErr
  165. } else {
  166. return conn, nil
  167. }
  168. }
  169. return nil, err
  170. }
  171. func isRPCError(err error) bool {
  172. return grpc.Code(err) != codes.Unknown
  173. }