grpc_proxy.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdmain
  15. import (
  16. "crypto/tls"
  17. "fmt"
  18. "net"
  19. "net/http"
  20. "os"
  21. "time"
  22. "github.com/coreos/etcd/clientv3"
  23. "github.com/coreos/etcd/clientv3/namespace"
  24. "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
  25. "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
  26. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  27. "github.com/coreos/etcd/pkg/debugutil"
  28. "github.com/coreos/etcd/pkg/transport"
  29. "github.com/coreos/etcd/proxy/grpcproxy"
  30. "github.com/cockroachdb/cmux"
  31. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  32. "github.com/prometheus/client_golang/prometheus"
  33. "github.com/spf13/cobra"
  34. "google.golang.org/grpc"
  35. )
  36. var (
  37. grpcProxyListenAddr string
  38. grpcProxyEndpoints []string
  39. grpcProxyDNSCluster string
  40. grpcProxyInsecureDiscovery bool
  41. grpcProxyCert string
  42. grpcProxyKey string
  43. grpcProxyCA string
  44. grpcProxyAdvertiseClientURL string
  45. grpcProxyResolverPrefix string
  46. grpcProxyResolverTTL int
  47. grpcProxyNamespace string
  48. grpcProxyEnablePprof bool
  49. )
  50. func init() {
  51. rootCmd.AddCommand(newGRPCProxyCommand())
  52. }
  53. // newGRPCProxyCommand returns the cobra command for "grpc-proxy".
  54. func newGRPCProxyCommand() *cobra.Command {
  55. lpc := &cobra.Command{
  56. Use: "grpc-proxy <subcommand>",
  57. Short: "grpc-proxy related command",
  58. }
  59. lpc.AddCommand(newGRPCProxyStartCommand())
  60. return lpc
  61. }
  62. func newGRPCProxyStartCommand() *cobra.Command {
  63. cmd := cobra.Command{
  64. Use: "start",
  65. Short: "start the grpc proxy",
  66. Run: startGRPCProxy,
  67. }
  68. cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
  69. cmd.Flags().StringVar(&grpcProxyDNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")
  70. cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
  71. cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
  72. cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
  73. cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
  74. cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
  75. cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
  76. cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)")
  77. cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints")
  78. cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests")
  79. cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`)
  80. return &cmd
  81. }
  82. func startGRPCProxy(cmd *cobra.Command, args []string) {
  83. if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
  84. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL))
  85. os.Exit(1)
  86. }
  87. if grpcProxyResolverPrefix == "" && grpcProxyResolverTTL > 0 {
  88. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-prefix %q", grpcProxyResolverPrefix))
  89. os.Exit(1)
  90. }
  91. if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL > 0 && grpcProxyAdvertiseClientURL == "" {
  92. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL))
  93. os.Exit(1)
  94. }
  95. srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery)
  96. if len(srvs.Endpoints) != 0 {
  97. grpcProxyEndpoints = srvs.Endpoints
  98. }
  99. l, err := net.Listen("tcp", grpcProxyListenAddr)
  100. if err != nil {
  101. fmt.Fprintln(os.Stderr, err)
  102. os.Exit(1)
  103. }
  104. if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil {
  105. fmt.Fprintln(os.Stderr, err)
  106. os.Exit(1)
  107. }
  108. plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
  109. defer func() {
  110. l.Close()
  111. plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
  112. }()
  113. m := cmux.New(l)
  114. cfg, err := newClientCfg()
  115. if err != nil {
  116. fmt.Fprintln(os.Stderr, err)
  117. os.Exit(1)
  118. }
  119. client, err := clientv3.New(*cfg)
  120. if err != nil {
  121. fmt.Fprintln(os.Stderr, err)
  122. os.Exit(1)
  123. }
  124. if len(grpcProxyNamespace) > 0 {
  125. client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
  126. client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
  127. client.Lease = namespace.NewLease(client.Lease, grpcProxyNamespace)
  128. }
  129. kvp, _ := grpcproxy.NewKvProxy(client)
  130. watchp, _ := grpcproxy.NewWatchProxy(client)
  131. if grpcProxyResolverPrefix != "" {
  132. grpcproxy.Register(client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
  133. }
  134. clusterp, _ := grpcproxy.NewClusterProxy(client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
  135. leasep, _ := grpcproxy.NewLeaseProxy(client)
  136. mainp := grpcproxy.NewMaintenanceProxy(client)
  137. authp := grpcproxy.NewAuthProxy(client)
  138. electionp := grpcproxy.NewElectionProxy(client)
  139. lockp := grpcproxy.NewLockProxy(client)
  140. server := grpc.NewServer(
  141. grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
  142. grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
  143. )
  144. pb.RegisterKVServer(server, kvp)
  145. pb.RegisterWatchServer(server, watchp)
  146. pb.RegisterClusterServer(server, clusterp)
  147. pb.RegisterLeaseServer(server, leasep)
  148. pb.RegisterMaintenanceServer(server, mainp)
  149. pb.RegisterAuthServer(server, authp)
  150. v3electionpb.RegisterElectionServer(server, electionp)
  151. v3lockpb.RegisterLockServer(server, lockp)
  152. errc := make(chan error)
  153. grpcl := m.Match(cmux.HTTP2())
  154. go func() { errc <- server.Serve(grpcl) }()
  155. httpmux := http.NewServeMux()
  156. httpmux.HandleFunc("/", http.NotFound)
  157. httpmux.Handle("/metrics", prometheus.Handler())
  158. if grpcProxyEnablePprof {
  159. for p, h := range debugutil.PProfHandlers() {
  160. httpmux.Handle(p, h)
  161. }
  162. plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf)
  163. }
  164. srvhttp := &http.Server{
  165. Handler: httpmux,
  166. }
  167. var httpl net.Listener
  168. if cfg.TLS != nil {
  169. srvhttp.TLSConfig = cfg.TLS
  170. httpl = tls.NewListener(m.Match(cmux.Any()), cfg.TLS)
  171. } else {
  172. httpl = m.Match(cmux.HTTP1())
  173. }
  174. go func() { errc <- srvhttp.Serve(httpl) }()
  175. go func() { errc <- m.Serve() }()
  176. // grpc-proxy is initialized, ready to serve
  177. notifySystemd()
  178. fmt.Fprintln(os.Stderr, <-errc)
  179. os.Exit(1)
  180. }
  181. func newClientCfg() (*clientv3.Config, error) {
  182. // set tls if any one tls option set
  183. var cfgtls *transport.TLSInfo
  184. tlsinfo := transport.TLSInfo{}
  185. if grpcProxyCert != "" {
  186. tlsinfo.CertFile = grpcProxyCert
  187. cfgtls = &tlsinfo
  188. }
  189. if grpcProxyKey != "" {
  190. tlsinfo.KeyFile = grpcProxyKey
  191. cfgtls = &tlsinfo
  192. }
  193. if grpcProxyCA != "" {
  194. tlsinfo.CAFile = grpcProxyCA
  195. cfgtls = &tlsinfo
  196. }
  197. cfg := clientv3.Config{
  198. Endpoints: grpcProxyEndpoints,
  199. DialTimeout: 5 * time.Second,
  200. }
  201. if cfgtls != nil {
  202. clientTLS, err := cfgtls.ClientConfig()
  203. if err != nil {
  204. return nil, err
  205. }
  206. cfg.TLS = clientTLS
  207. }
  208. // TODO: support insecure tls
  209. return &cfg, nil
  210. }