grpc_proxy.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdmain
  15. import (
  16. "crypto/tls"
  17. "fmt"
  18. "net"
  19. "net/http"
  20. "net/url"
  21. "os"
  22. "time"
  23. "github.com/coreos/etcd/clientv3"
  24. "github.com/coreos/etcd/clientv3/namespace"
  25. "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
  26. "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
  27. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  28. "github.com/coreos/etcd/pkg/debugutil"
  29. "github.com/coreos/etcd/pkg/transport"
  30. "github.com/coreos/etcd/proxy/grpcproxy"
  31. "github.com/cockroachdb/cmux"
  32. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  33. "github.com/prometheus/client_golang/prometheus"
  34. "github.com/spf13/cobra"
  35. "google.golang.org/grpc"
  36. )
  37. var (
  38. grpcProxyListenAddr string
  39. grpcProxyMetricsListenAddr string
  40. grpcProxyEndpoints []string
  41. grpcProxyDNSCluster string
  42. grpcProxyInsecureDiscovery bool
  43. grpcProxyCert string
  44. grpcProxyKey string
  45. grpcProxyCA string
  46. grpcProxyAdvertiseClientURL string
  47. grpcProxyResolverPrefix string
  48. grpcProxyResolverTTL int
  49. grpcProxyNamespace string
  50. grpcProxyEnablePprof bool
  51. )
  52. func init() {
  53. rootCmd.AddCommand(newGRPCProxyCommand())
  54. }
  55. // newGRPCProxyCommand returns the cobra command for "grpc-proxy".
  56. func newGRPCProxyCommand() *cobra.Command {
  57. lpc := &cobra.Command{
  58. Use: "grpc-proxy <subcommand>",
  59. Short: "grpc-proxy related command",
  60. }
  61. lpc.AddCommand(newGRPCProxyStartCommand())
  62. return lpc
  63. }
  64. func newGRPCProxyStartCommand() *cobra.Command {
  65. cmd := cobra.Command{
  66. Use: "start",
  67. Short: "start the grpc proxy",
  68. Run: startGRPCProxy,
  69. }
  70. cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
  71. cmd.Flags().StringVar(&grpcProxyDNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")
  72. cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for /metrics requests on an additional interface")
  73. cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
  74. cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
  75. cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
  76. cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
  77. cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
  78. cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
  79. cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)")
  80. cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints")
  81. cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests")
  82. cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`)
  83. return &cmd
  84. }
  85. func startGRPCProxy(cmd *cobra.Command, args []string) {
  86. if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
  87. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL))
  88. os.Exit(1)
  89. }
  90. if grpcProxyResolverPrefix == "" && grpcProxyResolverTTL > 0 {
  91. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-prefix %q", grpcProxyResolverPrefix))
  92. os.Exit(1)
  93. }
  94. if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL > 0 && grpcProxyAdvertiseClientURL == "" {
  95. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL))
  96. os.Exit(1)
  97. }
  98. srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery)
  99. if len(srvs.Endpoints) != 0 {
  100. grpcProxyEndpoints = srvs.Endpoints
  101. }
  102. l, err := net.Listen("tcp", grpcProxyListenAddr)
  103. if err != nil {
  104. fmt.Fprintln(os.Stderr, err)
  105. os.Exit(1)
  106. }
  107. if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil {
  108. fmt.Fprintln(os.Stderr, err)
  109. os.Exit(1)
  110. }
  111. plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
  112. defer func() {
  113. l.Close()
  114. plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
  115. }()
  116. m := cmux.New(l)
  117. cfg, cfgtls, err := newClientCfg()
  118. if err != nil {
  119. fmt.Fprintln(os.Stderr, err)
  120. os.Exit(1)
  121. }
  122. client, err := clientv3.New(*cfg)
  123. if err != nil {
  124. fmt.Fprintln(os.Stderr, err)
  125. os.Exit(1)
  126. }
  127. if len(grpcProxyNamespace) > 0 {
  128. client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
  129. client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
  130. client.Lease = namespace.NewLease(client.Lease, grpcProxyNamespace)
  131. }
  132. kvp, _ := grpcproxy.NewKvProxy(client)
  133. watchp, _ := grpcproxy.NewWatchProxy(client)
  134. if grpcProxyResolverPrefix != "" {
  135. grpcproxy.Register(client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
  136. }
  137. clusterp, _ := grpcproxy.NewClusterProxy(client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
  138. leasep, _ := grpcproxy.NewLeaseProxy(client)
  139. mainp := grpcproxy.NewMaintenanceProxy(client)
  140. authp := grpcproxy.NewAuthProxy(client)
  141. electionp := grpcproxy.NewElectionProxy(client)
  142. lockp := grpcproxy.NewLockProxy(client)
  143. server := grpc.NewServer(
  144. grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
  145. grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
  146. )
  147. pb.RegisterKVServer(server, kvp)
  148. pb.RegisterWatchServer(server, watchp)
  149. pb.RegisterClusterServer(server, clusterp)
  150. pb.RegisterLeaseServer(server, leasep)
  151. pb.RegisterMaintenanceServer(server, mainp)
  152. pb.RegisterAuthServer(server, authp)
  153. v3electionpb.RegisterElectionServer(server, electionp)
  154. v3lockpb.RegisterLockServer(server, lockp)
  155. errc := make(chan error)
  156. grpcl := m.Match(cmux.HTTP2())
  157. go func() { errc <- server.Serve(grpcl) }()
  158. httpmux := http.NewServeMux()
  159. httpmux.HandleFunc("/", http.NotFound)
  160. httpmux.Handle("/metrics", prometheus.Handler())
  161. if grpcProxyEnablePprof {
  162. for p, h := range debugutil.PProfHandlers() {
  163. httpmux.Handle(p, h)
  164. }
  165. plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf)
  166. }
  167. srvhttp := &http.Server{
  168. Handler: httpmux,
  169. }
  170. var httpl net.Listener
  171. if cfg.TLS != nil {
  172. srvhttp.TLSConfig = cfg.TLS
  173. httpl = tls.NewListener(m.Match(cmux.Any()), cfg.TLS)
  174. } else {
  175. httpl = m.Match(cmux.HTTP1())
  176. }
  177. go func() { errc <- srvhttp.Serve(httpl) }()
  178. go func() { errc <- m.Serve() }()
  179. if len(grpcProxyMetricsListenAddr) > 0 {
  180. murl, err := url.Parse(grpcProxyMetricsListenAddr)
  181. if err != nil {
  182. fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr)
  183. os.Exit(1)
  184. }
  185. ml, err := transport.NewListener(murl.Host, murl.Scheme, cfgtls)
  186. if err != nil {
  187. fmt.Fprintln(os.Stderr, err)
  188. os.Exit(1)
  189. }
  190. mux := http.NewServeMux()
  191. mux.Handle("/metrics", prometheus.Handler())
  192. go func() {
  193. plog.Info("grpc-proxy: listening for metrics on ", murl.String())
  194. plog.Fatal(http.Serve(ml, mux))
  195. }()
  196. }
  197. // grpc-proxy is initialized, ready to serve
  198. notifySystemd()
  199. fmt.Fprintln(os.Stderr, <-errc)
  200. os.Exit(1)
  201. }
  202. func newClientCfg() (*clientv3.Config, *transport.TLSInfo, error) {
  203. // set tls if any one tls option set
  204. var cfgtls *transport.TLSInfo
  205. tlsinfo := transport.TLSInfo{}
  206. if grpcProxyCert != "" {
  207. tlsinfo.CertFile = grpcProxyCert
  208. cfgtls = &tlsinfo
  209. }
  210. if grpcProxyKey != "" {
  211. tlsinfo.KeyFile = grpcProxyKey
  212. cfgtls = &tlsinfo
  213. }
  214. if grpcProxyCA != "" {
  215. tlsinfo.CAFile = grpcProxyCA
  216. cfgtls = &tlsinfo
  217. }
  218. cfg := clientv3.Config{
  219. Endpoints: grpcProxyEndpoints,
  220. DialTimeout: 5 * time.Second,
  221. }
  222. if cfgtls != nil {
  223. clientTLS, err := cfgtls.ClientConfig()
  224. if err != nil {
  225. return nil, nil, err
  226. }
  227. cfg.TLS = clientTLS
  228. }
  229. // TODO: support insecure tls
  230. return &cfg, cfgtls, nil
  231. }