grpc_proxy.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdmain
  15. import (
  16. "crypto/tls"
  17. "fmt"
  18. "net"
  19. "net/http"
  20. "os"
  21. "time"
  22. "github.com/coreos/etcd/clientv3"
  23. "github.com/coreos/etcd/clientv3/namespace"
  24. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  25. "github.com/coreos/etcd/pkg/debugutil"
  26. "github.com/coreos/etcd/pkg/transport"
  27. "github.com/coreos/etcd/proxy/grpcproxy"
  28. "github.com/cockroachdb/cmux"
  29. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  30. "github.com/prometheus/client_golang/prometheus"
  31. "github.com/spf13/cobra"
  32. "google.golang.org/grpc"
  33. )
  34. var (
  35. grpcProxyListenAddr string
  36. grpcProxyEndpoints []string
  37. grpcProxyDNSCluster string
  38. grpcProxyInsecureDiscovery bool
  39. grpcProxyCert string
  40. grpcProxyKey string
  41. grpcProxyCA string
  42. grpcProxyAdvertiseClientURL string
  43. grpcProxyResolverPrefix string
  44. grpcProxyResolverTTL int
  45. grpcProxyNamespace string
  46. grpcProxyEnablePprof bool
  47. )
  48. func init() {
  49. rootCmd.AddCommand(newGRPCProxyCommand())
  50. }
  51. // newGRPCProxyCommand returns the cobra command for "grpc-proxy".
  52. func newGRPCProxyCommand() *cobra.Command {
  53. lpc := &cobra.Command{
  54. Use: "grpc-proxy <subcommand>",
  55. Short: "grpc-proxy related command",
  56. }
  57. lpc.AddCommand(newGRPCProxyStartCommand())
  58. return lpc
  59. }
  60. func newGRPCProxyStartCommand() *cobra.Command {
  61. cmd := cobra.Command{
  62. Use: "start",
  63. Short: "start the grpc proxy",
  64. Run: startGRPCProxy,
  65. }
  66. cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
  67. cmd.Flags().StringVar(&grpcProxyDNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")
  68. cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
  69. cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
  70. cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
  71. cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
  72. cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
  73. cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
  74. cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)")
  75. cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints")
  76. cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests")
  77. cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`)
  78. return &cmd
  79. }
  80. func startGRPCProxy(cmd *cobra.Command, args []string) {
  81. if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
  82. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL))
  83. os.Exit(1)
  84. }
  85. if grpcProxyResolverPrefix == "" && grpcProxyResolverTTL > 0 {
  86. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-prefix %q", grpcProxyResolverPrefix))
  87. os.Exit(1)
  88. }
  89. if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL > 0 && grpcProxyAdvertiseClientURL == "" {
  90. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL))
  91. os.Exit(1)
  92. }
  93. if eps := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery); len(eps) != 0 {
  94. grpcProxyEndpoints = eps
  95. }
  96. l, err := net.Listen("tcp", grpcProxyListenAddr)
  97. if err != nil {
  98. fmt.Fprintln(os.Stderr, err)
  99. os.Exit(1)
  100. }
  101. if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil {
  102. fmt.Fprintln(os.Stderr, err)
  103. os.Exit(1)
  104. }
  105. plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
  106. defer func() {
  107. l.Close()
  108. plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
  109. }()
  110. m := cmux.New(l)
  111. cfg, err := newClientCfg()
  112. if err != nil {
  113. fmt.Fprintln(os.Stderr, err)
  114. os.Exit(1)
  115. }
  116. client, err := clientv3.New(*cfg)
  117. if err != nil {
  118. fmt.Fprintln(os.Stderr, err)
  119. os.Exit(1)
  120. }
  121. if len(grpcProxyNamespace) > 0 {
  122. client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
  123. client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
  124. client.Lease = namespace.NewLease(client.Lease, grpcProxyNamespace)
  125. }
  126. kvp, _ := grpcproxy.NewKvProxy(client)
  127. watchp, _ := grpcproxy.NewWatchProxy(client)
  128. if grpcProxyResolverPrefix != "" {
  129. grpcproxy.Register(client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
  130. }
  131. clusterp, _ := grpcproxy.NewClusterProxy(client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
  132. leasep, _ := grpcproxy.NewLeaseProxy(client)
  133. mainp := grpcproxy.NewMaintenanceProxy(client)
  134. authp := grpcproxy.NewAuthProxy(client)
  135. server := grpc.NewServer(
  136. grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
  137. grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
  138. )
  139. pb.RegisterKVServer(server, kvp)
  140. pb.RegisterWatchServer(server, watchp)
  141. pb.RegisterClusterServer(server, clusterp)
  142. pb.RegisterLeaseServer(server, leasep)
  143. pb.RegisterMaintenanceServer(server, mainp)
  144. pb.RegisterAuthServer(server, authp)
  145. errc := make(chan error)
  146. grpcl := m.Match(cmux.HTTP2())
  147. go func() { errc <- server.Serve(grpcl) }()
  148. httpmux := http.NewServeMux()
  149. httpmux.HandleFunc("/", http.NotFound)
  150. httpmux.Handle("/metrics", prometheus.Handler())
  151. if grpcProxyEnablePprof {
  152. for p, h := range debugutil.PProfHandlers() {
  153. httpmux.Handle(p, h)
  154. }
  155. plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf)
  156. }
  157. srvhttp := &http.Server{
  158. Handler: httpmux,
  159. }
  160. var httpl net.Listener
  161. if cfg.TLS != nil {
  162. srvhttp.TLSConfig = cfg.TLS
  163. httpl = tls.NewListener(m.Match(cmux.Any()), cfg.TLS)
  164. } else {
  165. httpl = m.Match(cmux.HTTP1())
  166. }
  167. go func() { errc <- srvhttp.Serve(httpl) }()
  168. go func() { errc <- m.Serve() }()
  169. // grpc-proxy is initialized, ready to serve
  170. notifySystemd()
  171. fmt.Fprintln(os.Stderr, <-errc)
  172. os.Exit(1)
  173. }
  174. func newClientCfg() (*clientv3.Config, error) {
  175. // set tls if any one tls option set
  176. var cfgtls *transport.TLSInfo
  177. tlsinfo := transport.TLSInfo{}
  178. if grpcProxyCert != "" {
  179. tlsinfo.CertFile = grpcProxyCert
  180. cfgtls = &tlsinfo
  181. }
  182. if grpcProxyKey != "" {
  183. tlsinfo.KeyFile = grpcProxyKey
  184. cfgtls = &tlsinfo
  185. }
  186. if grpcProxyCA != "" {
  187. tlsinfo.CAFile = grpcProxyCA
  188. cfgtls = &tlsinfo
  189. }
  190. cfg := clientv3.Config{
  191. Endpoints: grpcProxyEndpoints,
  192. DialTimeout: 5 * time.Second,
  193. }
  194. if cfgtls != nil {
  195. clientTLS, err := cfgtls.ClientConfig()
  196. if err != nil {
  197. return nil, err
  198. }
  199. cfg.TLS = clientTLS
  200. }
  201. // TODO: support insecure tls
  202. return &cfg, nil
  203. }