grpc_proxy.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdmain
  15. import (
  16. "crypto/tls"
  17. "fmt"
  18. "math"
  19. "net"
  20. "net/http"
  21. "net/url"
  22. "os"
  23. "time"
  24. "github.com/coreos/etcd/clientv3"
  25. "github.com/coreos/etcd/clientv3/namespace"
  26. "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
  27. "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
  28. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  29. "github.com/coreos/etcd/pkg/debugutil"
  30. "github.com/coreos/etcd/pkg/transport"
  31. "github.com/coreos/etcd/proxy/grpcproxy"
  32. "github.com/cockroachdb/cmux"
  33. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  34. "github.com/prometheus/client_golang/prometheus"
  35. "github.com/spf13/cobra"
  36. "google.golang.org/grpc"
  37. )
  38. var (
  39. grpcProxyListenAddr string
  40. grpcProxyMetricsListenAddr string
  41. grpcProxyEndpoints []string
  42. grpcProxyDNSCluster string
  43. grpcProxyInsecureDiscovery bool
  44. // tls for connecting to etcd
  45. grpcProxyCA string
  46. grpcProxyCert string
  47. grpcProxyKey string
  48. // tls for clients connecting to proxy
  49. grpcProxyListenCA string
  50. grpcProxyListenCert string
  51. grpcProxyListenKey string
  52. grpcProxyAdvertiseClientURL string
  53. grpcProxyResolverPrefix string
  54. grpcProxyResolverTTL int
  55. grpcProxyNamespace string
  56. grpcProxyEnablePprof bool
  57. )
  58. func init() {
  59. rootCmd.AddCommand(newGRPCProxyCommand())
  60. }
  61. // newGRPCProxyCommand returns the cobra command for "grpc-proxy".
  62. func newGRPCProxyCommand() *cobra.Command {
  63. lpc := &cobra.Command{
  64. Use: "grpc-proxy <subcommand>",
  65. Short: "grpc-proxy related command",
  66. }
  67. lpc.AddCommand(newGRPCProxyStartCommand())
  68. return lpc
  69. }
  70. func newGRPCProxyStartCommand() *cobra.Command {
  71. cmd := cobra.Command{
  72. Use: "start",
  73. Short: "start the grpc proxy",
  74. Run: startGRPCProxy,
  75. }
  76. cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
  77. cmd.Flags().StringVar(&grpcProxyDNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")
  78. cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for /metrics requests on an additional interface")
  79. cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
  80. cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
  81. cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
  82. cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)")
  83. cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints")
  84. cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests")
  85. cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`)
  86. // client TLS for connecting to server
  87. cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
  88. cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
  89. cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
  90. // client TLS for connecting to proxy
  91. cmd.Flags().StringVar(&grpcProxyListenCert, "cert-file", "", "identify secure connections to the proxy using this TLS certificate file")
  92. cmd.Flags().StringVar(&grpcProxyListenKey, "key-file", "", "identify secure connections to the proxy using this TLS key file")
  93. cmd.Flags().StringVar(&grpcProxyListenCA, "trusted-ca-file", "", "verify certificates of TLS-enabled secure proxy using this CA bundle")
  94. return &cmd
  95. }
  96. func startGRPCProxy(cmd *cobra.Command, args []string) {
  97. checkArgs()
  98. tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
  99. if tlsinfo != nil {
  100. plog.Infof("ServerTLS: %s", tlsinfo)
  101. }
  102. m := mustListenCMux(tlsinfo)
  103. grpcl := m.Match(cmux.HTTP2())
  104. defer func() {
  105. grpcl.Close()
  106. plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
  107. }()
  108. client := mustNewClient()
  109. srvhttp, httpl := mustHTTPListener(m, tlsinfo)
  110. errc := make(chan error)
  111. go func() { errc <- newGRPCProxyServer(client).Serve(grpcl) }()
  112. go func() { errc <- srvhttp.Serve(httpl) }()
  113. go func() { errc <- m.Serve() }()
  114. if len(grpcProxyMetricsListenAddr) > 0 {
  115. mhttpl := mustMetricsListener(tlsinfo)
  116. go func() {
  117. mux := http.NewServeMux()
  118. mux.Handle("/metrics", prometheus.Handler())
  119. plog.Fatal(http.Serve(mhttpl, mux))
  120. }()
  121. }
  122. // grpc-proxy is initialized, ready to serve
  123. notifySystemd()
  124. fmt.Fprintln(os.Stderr, <-errc)
  125. os.Exit(1)
  126. }
  127. func checkArgs() {
  128. if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
  129. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL))
  130. os.Exit(1)
  131. }
  132. if grpcProxyResolverPrefix == "" && grpcProxyResolverTTL > 0 {
  133. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-prefix %q", grpcProxyResolverPrefix))
  134. os.Exit(1)
  135. }
  136. if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL > 0 && grpcProxyAdvertiseClientURL == "" {
  137. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL))
  138. os.Exit(1)
  139. }
  140. }
  141. func mustNewClient() *clientv3.Client {
  142. srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery)
  143. eps := srvs.Endpoints
  144. if len(eps) == 0 {
  145. eps = grpcProxyEndpoints
  146. }
  147. cfg, err := newClientCfg(eps)
  148. if err != nil {
  149. fmt.Fprintln(os.Stderr, err)
  150. os.Exit(1)
  151. }
  152. client, err := clientv3.New(*cfg)
  153. if err != nil {
  154. fmt.Fprintln(os.Stderr, err)
  155. os.Exit(1)
  156. }
  157. return client
  158. }
  159. func newClientCfg(eps []string) (*clientv3.Config, error) {
  160. // set tls if any one tls option set
  161. cfg := clientv3.Config{
  162. Endpoints: eps,
  163. DialTimeout: 5 * time.Second,
  164. }
  165. if tls := newTLS(grpcProxyCA, grpcProxyCert, grpcProxyKey); tls != nil {
  166. clientTLS, err := tls.ClientConfig()
  167. if err != nil {
  168. return nil, err
  169. }
  170. cfg.TLS = clientTLS
  171. plog.Infof("ClientTLS: %s", tls)
  172. }
  173. // TODO: support insecure tls
  174. return &cfg, nil
  175. }
  176. func newTLS(ca, cert, key string) *transport.TLSInfo {
  177. if ca == "" && cert == "" && key == "" {
  178. return nil
  179. }
  180. return &transport.TLSInfo{CAFile: ca, CertFile: cert, KeyFile: key}
  181. }
  182. func mustListenCMux(tlsinfo *transport.TLSInfo) cmux.CMux {
  183. l, err := net.Listen("tcp", grpcProxyListenAddr)
  184. if err != nil {
  185. fmt.Fprintln(os.Stderr, err)
  186. os.Exit(1)
  187. }
  188. var tlscfg *tls.Config
  189. scheme := "http"
  190. if tlsinfo != nil {
  191. if tlscfg, err = tlsinfo.ServerConfig(); err != nil {
  192. plog.Fatal(err)
  193. }
  194. scheme = "https"
  195. }
  196. if l, err = transport.NewKeepAliveListener(l, scheme, tlscfg); err != nil {
  197. fmt.Fprintln(os.Stderr, err)
  198. os.Exit(1)
  199. }
  200. plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
  201. return cmux.New(l)
  202. }
  203. func newGRPCProxyServer(client *clientv3.Client) *grpc.Server {
  204. if len(grpcProxyNamespace) > 0 {
  205. client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
  206. client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
  207. client.Lease = namespace.NewLease(client.Lease, grpcProxyNamespace)
  208. }
  209. kvp, _ := grpcproxy.NewKvProxy(client)
  210. watchp, _ := grpcproxy.NewWatchProxy(client)
  211. if grpcProxyResolverPrefix != "" {
  212. grpcproxy.Register(client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
  213. }
  214. clusterp, _ := grpcproxy.NewClusterProxy(client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
  215. leasep, _ := grpcproxy.NewLeaseProxy(client)
  216. mainp := grpcproxy.NewMaintenanceProxy(client)
  217. authp := grpcproxy.NewAuthProxy(client)
  218. electionp := grpcproxy.NewElectionProxy(client)
  219. lockp := grpcproxy.NewLockProxy(client)
  220. server := grpc.NewServer(
  221. grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
  222. grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
  223. grpc.MaxConcurrentStreams(math.MaxUint32),
  224. )
  225. pb.RegisterKVServer(server, kvp)
  226. pb.RegisterWatchServer(server, watchp)
  227. pb.RegisterClusterServer(server, clusterp)
  228. pb.RegisterLeaseServer(server, leasep)
  229. pb.RegisterMaintenanceServer(server, mainp)
  230. pb.RegisterAuthServer(server, authp)
  231. v3electionpb.RegisterElectionServer(server, electionp)
  232. v3lockpb.RegisterLockServer(server, lockp)
  233. return server
  234. }
  235. func mustHTTPListener(m cmux.CMux, tlsinfo *transport.TLSInfo) (*http.Server, net.Listener) {
  236. httpmux := http.NewServeMux()
  237. httpmux.HandleFunc("/", http.NotFound)
  238. httpmux.Handle("/metrics", prometheus.Handler())
  239. if grpcProxyEnablePprof {
  240. for p, h := range debugutil.PProfHandlers() {
  241. httpmux.Handle(p, h)
  242. }
  243. plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf)
  244. }
  245. srvhttp := &http.Server{Handler: httpmux}
  246. if tlsinfo == nil {
  247. return srvhttp, m.Match(cmux.HTTP1())
  248. }
  249. srvTLS, err := tlsinfo.ServerConfig()
  250. if err != nil {
  251. plog.Fatalf("could not setup TLS (%v)", err)
  252. }
  253. srvhttp.TLSConfig = srvTLS
  254. return srvhttp, m.Match(cmux.Any())
  255. }
  256. func mustMetricsListener(tlsinfo *transport.TLSInfo) net.Listener {
  257. murl, err := url.Parse(grpcProxyMetricsListenAddr)
  258. if err != nil {
  259. fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr)
  260. os.Exit(1)
  261. }
  262. ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsinfo)
  263. if err != nil {
  264. fmt.Fprintln(os.Stderr, err)
  265. os.Exit(1)
  266. }
  267. plog.Info("grpc-proxy: listening for metrics on ", murl.String())
  268. return ml
  269. }