grpc_proxy.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdmain
  15. import (
  16. "crypto/tls"
  17. "fmt"
  18. "net"
  19. "net/http"
  20. "os"
  21. "time"
  22. "github.com/coreos/etcd/clientv3"
  23. "github.com/coreos/etcd/clientv3/namespace"
  24. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  25. "github.com/coreos/etcd/pkg/debugutil"
  26. "github.com/coreos/etcd/pkg/transport"
  27. "github.com/coreos/etcd/proxy/grpcproxy"
  28. "github.com/cockroachdb/cmux"
  29. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  30. "github.com/prometheus/client_golang/prometheus"
  31. "github.com/spf13/cobra"
  32. "google.golang.org/grpc"
  33. )
  34. var (
  35. grpcProxyListenAddr string
  36. grpcProxyEndpoints []string
  37. grpcProxyDNSCluster string
  38. grpcProxyInsecureDiscovery bool
  39. grpcProxyCert string
  40. grpcProxyKey string
  41. grpcProxyCA string
  42. grpcProxyAdvertiseClientURL string
  43. grpcProxyResolverPrefix string
  44. grpcProxyResolverTTL int
  45. grpcProxyNamespace string
  46. grpcProxyEnablePprof bool
  47. )
  48. func init() {
  49. rootCmd.AddCommand(newGRPCProxyCommand())
  50. }
  51. // newGRPCProxyCommand returns the cobra command for "grpc-proxy".
  52. func newGRPCProxyCommand() *cobra.Command {
  53. lpc := &cobra.Command{
  54. Use: "grpc-proxy <subcommand>",
  55. Short: "grpc-proxy related command",
  56. }
  57. lpc.AddCommand(newGRPCProxyStartCommand())
  58. return lpc
  59. }
  60. func newGRPCProxyStartCommand() *cobra.Command {
  61. cmd := cobra.Command{
  62. Use: "start",
  63. Short: "start the grpc proxy",
  64. Run: startGRPCProxy,
  65. }
  66. cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
  67. cmd.Flags().StringVar(&grpcProxyDNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")
  68. cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
  69. cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
  70. cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
  71. cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
  72. cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
  73. cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
  74. cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)")
  75. cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints")
  76. cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests")
  77. cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`)
  78. return &cmd
  79. }
  80. func startGRPCProxy(cmd *cobra.Command, args []string) {
  81. if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
  82. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL))
  83. os.Exit(1)
  84. }
  85. if grpcProxyResolverPrefix == "" && grpcProxyResolverTTL > 0 {
  86. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-prefix %q", grpcProxyResolverPrefix))
  87. os.Exit(1)
  88. }
  89. if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL > 0 && grpcProxyAdvertiseClientURL == "" {
  90. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL))
  91. os.Exit(1)
  92. }
  93. srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery)
  94. if len(srvs.Endpoints) != 0 {
  95. grpcProxyEndpoints = srvs.Endpoints
  96. }
  97. l, err := net.Listen("tcp", grpcProxyListenAddr)
  98. if err != nil {
  99. fmt.Fprintln(os.Stderr, err)
  100. os.Exit(1)
  101. }
  102. if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil {
  103. fmt.Fprintln(os.Stderr, err)
  104. os.Exit(1)
  105. }
  106. plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
  107. defer func() {
  108. l.Close()
  109. plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
  110. }()
  111. m := cmux.New(l)
  112. cfg, err := newClientCfg()
  113. if err != nil {
  114. fmt.Fprintln(os.Stderr, err)
  115. os.Exit(1)
  116. }
  117. client, err := clientv3.New(*cfg)
  118. if err != nil {
  119. fmt.Fprintln(os.Stderr, err)
  120. os.Exit(1)
  121. }
  122. if len(grpcProxyNamespace) > 0 {
  123. client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
  124. client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
  125. client.Lease = namespace.NewLease(client.Lease, grpcProxyNamespace)
  126. }
  127. kvp, _ := grpcproxy.NewKvProxy(client)
  128. watchp, _ := grpcproxy.NewWatchProxy(client)
  129. if grpcProxyResolverPrefix != "" {
  130. grpcproxy.Register(client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
  131. }
  132. clusterp, _ := grpcproxy.NewClusterProxy(client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
  133. leasep, _ := grpcproxy.NewLeaseProxy(client)
  134. mainp := grpcproxy.NewMaintenanceProxy(client)
  135. authp := grpcproxy.NewAuthProxy(client)
  136. server := grpc.NewServer(
  137. grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
  138. grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
  139. )
  140. pb.RegisterKVServer(server, kvp)
  141. pb.RegisterWatchServer(server, watchp)
  142. pb.RegisterClusterServer(server, clusterp)
  143. pb.RegisterLeaseServer(server, leasep)
  144. pb.RegisterMaintenanceServer(server, mainp)
  145. pb.RegisterAuthServer(server, authp)
  146. errc := make(chan error)
  147. grpcl := m.Match(cmux.HTTP2())
  148. go func() { errc <- server.Serve(grpcl) }()
  149. httpmux := http.NewServeMux()
  150. httpmux.HandleFunc("/", http.NotFound)
  151. httpmux.Handle("/metrics", prometheus.Handler())
  152. if grpcProxyEnablePprof {
  153. for p, h := range debugutil.PProfHandlers() {
  154. httpmux.Handle(p, h)
  155. }
  156. plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf)
  157. }
  158. srvhttp := &http.Server{
  159. Handler: httpmux,
  160. }
  161. var httpl net.Listener
  162. if cfg.TLS != nil {
  163. srvhttp.TLSConfig = cfg.TLS
  164. httpl = tls.NewListener(m.Match(cmux.Any()), cfg.TLS)
  165. } else {
  166. httpl = m.Match(cmux.HTTP1())
  167. }
  168. go func() { errc <- srvhttp.Serve(httpl) }()
  169. go func() { errc <- m.Serve() }()
  170. // grpc-proxy is initialized, ready to serve
  171. notifySystemd()
  172. fmt.Fprintln(os.Stderr, <-errc)
  173. os.Exit(1)
  174. }
  175. func newClientCfg() (*clientv3.Config, error) {
  176. // set tls if any one tls option set
  177. var cfgtls *transport.TLSInfo
  178. tlsinfo := transport.TLSInfo{}
  179. if grpcProxyCert != "" {
  180. tlsinfo.CertFile = grpcProxyCert
  181. cfgtls = &tlsinfo
  182. }
  183. if grpcProxyKey != "" {
  184. tlsinfo.KeyFile = grpcProxyKey
  185. cfgtls = &tlsinfo
  186. }
  187. if grpcProxyCA != "" {
  188. tlsinfo.CAFile = grpcProxyCA
  189. cfgtls = &tlsinfo
  190. }
  191. cfg := clientv3.Config{
  192. Endpoints: grpcProxyEndpoints,
  193. DialTimeout: 5 * time.Second,
  194. }
  195. if cfgtls != nil {
  196. clientTLS, err := cfgtls.ClientConfig()
  197. if err != nil {
  198. return nil, err
  199. }
  200. cfg.TLS = clientTLS
  201. }
  202. // TODO: support insecure tls
  203. return &cfg, nil
  204. }