grpc_proxy.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdmain
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "crypto/x509"
  19. "fmt"
  20. "io/ioutil"
  21. "log"
  22. "math"
  23. "net"
  24. "net/http"
  25. "net/url"
  26. "os"
  27. "path/filepath"
  28. "time"
  29. "go.etcd.io/etcd/clientv3"
  30. "go.etcd.io/etcd/clientv3/leasing"
  31. "go.etcd.io/etcd/clientv3/namespace"
  32. "go.etcd.io/etcd/clientv3/ordering"
  33. "go.etcd.io/etcd/etcdserver/api/v3election/v3electionpb"
  34. "go.etcd.io/etcd/etcdserver/api/v3lock/v3lockpb"
  35. pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
  36. "go.etcd.io/etcd/pkg/debugutil"
  37. "go.etcd.io/etcd/pkg/logutil"
  38. "go.etcd.io/etcd/pkg/transport"
  39. "go.etcd.io/etcd/proxy/grpcproxy"
  40. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  41. "github.com/soheilhy/cmux"
  42. "github.com/spf13/cobra"
  43. "go.uber.org/zap"
  44. "google.golang.org/grpc"
  45. "google.golang.org/grpc/grpclog"
  46. )
  47. var (
  48. grpcProxyListenAddr string
  49. grpcProxyMetricsListenAddr string
  50. grpcProxyEndpoints []string
  51. grpcProxyDNSCluster string
  52. grpcProxyDNSClusterServiceName string
  53. grpcProxyInsecureDiscovery bool
  54. grpcProxyDataDir string
  55. grpcMaxCallSendMsgSize int
  56. grpcMaxCallRecvMsgSize int
  57. // tls for connecting to etcd
  58. grpcProxyCA string
  59. grpcProxyCert string
  60. grpcProxyKey string
  61. grpcProxyInsecureSkipTLSVerify bool
  62. // tls for clients connecting to proxy
  63. grpcProxyListenCA string
  64. grpcProxyListenCert string
  65. grpcProxyListenKey string
  66. grpcProxyListenAutoTLS bool
  67. grpcProxyListenCRL string
  68. grpcProxyAdvertiseClientURL string
  69. grpcProxyResolverPrefix string
  70. grpcProxyResolverTTL int
  71. grpcProxyNamespace string
  72. grpcProxyLeasing string
  73. grpcProxyEnablePprof bool
  74. grpcProxyEnableOrdering bool
  75. grpcProxyDebug bool
  76. )
  77. const defaultGRPCMaxCallSendMsgSize = 1.5 * 1024 * 1024
  78. func init() {
  79. rootCmd.AddCommand(newGRPCProxyCommand())
  80. }
  81. // newGRPCProxyCommand returns the cobra command for "grpc-proxy".
  82. func newGRPCProxyCommand() *cobra.Command {
  83. lpc := &cobra.Command{
  84. Use: "grpc-proxy <subcommand>",
  85. Short: "grpc-proxy related command",
  86. }
  87. lpc.AddCommand(newGRPCProxyStartCommand())
  88. return lpc
  89. }
  90. func newGRPCProxyStartCommand() *cobra.Command {
  91. cmd := cobra.Command{
  92. Use: "start",
  93. Short: "start the grpc proxy",
  94. Run: startGRPCProxy,
  95. }
  96. cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
  97. cmd.Flags().StringVar(&grpcProxyDNSCluster, "discovery-srv", "", "domain name to query for SRV records describing cluster endpoints")
  98. cmd.Flags().StringVar(&grpcProxyDNSClusterServiceName, "discovery-srv-name", "", "service name to query when using DNS discovery")
  99. cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for endpoint /metrics requests on an additional interface")
  100. cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
  101. cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
  102. cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
  103. cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)")
  104. cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints")
  105. cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests")
  106. cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`)
  107. cmd.Flags().StringVar(&grpcProxyDataDir, "data-dir", "default.proxy", "Data directory for persistent data")
  108. cmd.Flags().IntVar(&grpcMaxCallSendMsgSize, "max-send-bytes", defaultGRPCMaxCallSendMsgSize, "message send limits in bytes (default value is 1.5 MiB)")
  109. cmd.Flags().IntVar(&grpcMaxCallRecvMsgSize, "max-recv-bytes", math.MaxInt32, "message receive limits in bytes (default value is math.MaxInt32)")
  110. // client TLS for connecting to server
  111. cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
  112. cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
  113. cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
  114. cmd.Flags().BoolVar(&grpcProxyInsecureSkipTLSVerify, "insecure-skip-tls-verify", false, "skip authentication of etcd server TLS certificates")
  115. // client TLS for connecting to proxy
  116. cmd.Flags().StringVar(&grpcProxyListenCert, "cert-file", "", "identify secure connections to the proxy using this TLS certificate file")
  117. cmd.Flags().StringVar(&grpcProxyListenKey, "key-file", "", "identify secure connections to the proxy using this TLS key file")
  118. cmd.Flags().StringVar(&grpcProxyListenCA, "trusted-ca-file", "", "verify certificates of TLS-enabled secure proxy using this CA bundle")
  119. cmd.Flags().BoolVar(&grpcProxyListenAutoTLS, "auto-tls", false, "proxy TLS using generated certificates")
  120. cmd.Flags().StringVar(&grpcProxyListenCRL, "client-crl-file", "", "proxy client certificate revocation list file.")
  121. // experimental flags
  122. cmd.Flags().BoolVar(&grpcProxyEnableOrdering, "experimental-serializable-ordering", false, "Ensure serializable reads have monotonically increasing store revisions across endpoints.")
  123. cmd.Flags().StringVar(&grpcProxyLeasing, "experimental-leasing-prefix", "", "leasing metadata prefix for disconnected linearized reads.")
  124. cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.")
  125. return &cmd
  126. }
  127. func startGRPCProxy(cmd *cobra.Command, args []string) {
  128. checkArgs()
  129. lcfg := logutil.DefaultZapLoggerConfig
  130. if grpcProxyDebug {
  131. lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
  132. grpc.EnableTracing = true
  133. }
  134. lg, err := lcfg.Build()
  135. if err != nil {
  136. log.Fatal(err)
  137. }
  138. defer lg.Sync()
  139. var gl grpclog.LoggerV2
  140. gl, err = logutil.NewGRPCLoggerV2(lcfg)
  141. if err != nil {
  142. log.Fatal(err)
  143. }
  144. grpclog.SetLoggerV2(gl)
  145. tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
  146. if tlsinfo == nil && grpcProxyListenAutoTLS {
  147. host := []string{"https://" + grpcProxyListenAddr}
  148. dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy")
  149. autoTLS, err := transport.SelfCert(lg, dir, host)
  150. if err != nil {
  151. log.Fatal(err)
  152. }
  153. tlsinfo = &autoTLS
  154. }
  155. if tlsinfo != nil {
  156. lg.Info("gRPC proxy server TLS", zap.String("tls-info", fmt.Sprintf("%+v", tlsinfo)))
  157. }
  158. m := mustListenCMux(lg, tlsinfo)
  159. grpcl := m.Match(cmux.HTTP2())
  160. defer func() {
  161. grpcl.Close()
  162. lg.Info("stop listening gRPC proxy client requests", zap.String("address", grpcProxyListenAddr))
  163. }()
  164. client := mustNewClient(lg)
  165. httpClient := mustNewHTTPClient(lg)
  166. srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client)
  167. errc := make(chan error)
  168. go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }()
  169. go func() { errc <- srvhttp.Serve(httpl) }()
  170. go func() { errc <- m.Serve() }()
  171. if len(grpcProxyMetricsListenAddr) > 0 {
  172. mhttpl := mustMetricsListener(lg, tlsinfo)
  173. go func() {
  174. mux := http.NewServeMux()
  175. grpcproxy.HandleMetrics(mux, httpClient, client.Endpoints())
  176. grpcproxy.HandleHealth(mux, client)
  177. lg.Info("gRPC proxy server metrics URL serving")
  178. herr := http.Serve(mhttpl, mux)
  179. if herr != nil {
  180. lg.Fatal("gRPC proxy server metrics URL returned", zap.Error(herr))
  181. } else {
  182. lg.Info("gRPC proxy server metrics URL returned")
  183. }
  184. }()
  185. }
  186. lg.Info("started gRPC proxy", zap.String("address", grpcProxyListenAddr))
  187. // grpc-proxy is initialized, ready to serve
  188. notifySystemd(lg)
  189. fmt.Fprintln(os.Stderr, <-errc)
  190. os.Exit(1)
  191. }
  192. func checkArgs() {
  193. if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
  194. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL))
  195. os.Exit(1)
  196. }
  197. if grpcProxyResolverPrefix == "" && grpcProxyResolverTTL > 0 {
  198. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-prefix %q", grpcProxyResolverPrefix))
  199. os.Exit(1)
  200. }
  201. if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL > 0 && grpcProxyAdvertiseClientURL == "" {
  202. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL))
  203. os.Exit(1)
  204. }
  205. }
  206. func mustNewClient(lg *zap.Logger) *clientv3.Client {
  207. srvs := discoverEndpoints(lg, grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery, grpcProxyDNSClusterServiceName)
  208. eps := srvs.Endpoints
  209. if len(eps) == 0 {
  210. eps = grpcProxyEndpoints
  211. }
  212. cfg, err := newClientCfg(lg, eps)
  213. if err != nil {
  214. fmt.Fprintln(os.Stderr, err)
  215. os.Exit(1)
  216. }
  217. cfg.DialOptions = append(cfg.DialOptions,
  218. grpc.WithUnaryInterceptor(grpcproxy.AuthUnaryClientInterceptor))
  219. cfg.DialOptions = append(cfg.DialOptions,
  220. grpc.WithStreamInterceptor(grpcproxy.AuthStreamClientInterceptor))
  221. client, err := clientv3.New(*cfg)
  222. if err != nil {
  223. fmt.Fprintln(os.Stderr, err)
  224. os.Exit(1)
  225. }
  226. return client
  227. }
  228. func newClientCfg(lg *zap.Logger, eps []string) (*clientv3.Config, error) {
  229. // set tls if any one tls option set
  230. cfg := clientv3.Config{
  231. Endpoints: eps,
  232. DialTimeout: 5 * time.Second,
  233. }
  234. if grpcMaxCallSendMsgSize > 0 {
  235. cfg.MaxCallSendMsgSize = grpcMaxCallSendMsgSize
  236. }
  237. if grpcMaxCallRecvMsgSize > 0 {
  238. cfg.MaxCallRecvMsgSize = grpcMaxCallRecvMsgSize
  239. }
  240. tls := newTLS(grpcProxyCA, grpcProxyCert, grpcProxyKey)
  241. if tls == nil && grpcProxyInsecureSkipTLSVerify {
  242. tls = &transport.TLSInfo{}
  243. }
  244. if tls != nil {
  245. clientTLS, err := tls.ClientConfig()
  246. if err != nil {
  247. return nil, err
  248. }
  249. clientTLS.InsecureSkipVerify = grpcProxyInsecureSkipTLSVerify
  250. cfg.TLS = clientTLS
  251. lg.Info("gRPC proxy client TLS", zap.String("tls-info", fmt.Sprintf("%+v", tls)))
  252. }
  253. return &cfg, nil
  254. }
  255. func newTLS(ca, cert, key string) *transport.TLSInfo {
  256. if ca == "" && cert == "" && key == "" {
  257. return nil
  258. }
  259. return &transport.TLSInfo{TrustedCAFile: ca, CertFile: cert, KeyFile: key, EmptyCN: true}
  260. }
  261. func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux {
  262. l, err := net.Listen("tcp", grpcProxyListenAddr)
  263. if err != nil {
  264. fmt.Fprintln(os.Stderr, err)
  265. os.Exit(1)
  266. }
  267. if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil {
  268. fmt.Fprintln(os.Stderr, err)
  269. os.Exit(1)
  270. }
  271. if tlsinfo != nil {
  272. tlsinfo.CRLFile = grpcProxyListenCRL
  273. if l, err = transport.NewTLSListener(l, tlsinfo); err != nil {
  274. lg.Fatal("failed to create TLS listener", zap.Error(err))
  275. }
  276. }
  277. lg.Info("listening for gRPC proxy client requests", zap.String("address", grpcProxyListenAddr))
  278. return cmux.New(l)
  279. }
  280. func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
  281. if grpcProxyEnableOrdering {
  282. vf := ordering.NewOrderViolationSwitchEndpointClosure(*client)
  283. client.KV = ordering.NewKV(client.KV, vf)
  284. lg.Info("waiting for linearized read from cluster to recover ordering")
  285. for {
  286. _, err := client.KV.Get(context.TODO(), "_", clientv3.WithKeysOnly())
  287. if err == nil {
  288. break
  289. }
  290. lg.Warn("ordering recovery failed, retrying in 1s", zap.Error(err))
  291. time.Sleep(time.Second)
  292. }
  293. }
  294. if len(grpcProxyNamespace) > 0 {
  295. client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
  296. client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
  297. client.Lease = namespace.NewLease(client.Lease, grpcProxyNamespace)
  298. }
  299. if len(grpcProxyLeasing) > 0 {
  300. client.KV, _, _ = leasing.NewKV(client, grpcProxyLeasing)
  301. }
  302. kvp, _ := grpcproxy.NewKvProxy(client)
  303. watchp, _ := grpcproxy.NewWatchProxy(client)
  304. if grpcProxyResolverPrefix != "" {
  305. grpcproxy.Register(client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
  306. }
  307. clusterp, _ := grpcproxy.NewClusterProxy(client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
  308. leasep, _ := grpcproxy.NewLeaseProxy(client)
  309. mainp := grpcproxy.NewMaintenanceProxy(client)
  310. authp := grpcproxy.NewAuthProxy(client)
  311. electionp := grpcproxy.NewElectionProxy(client)
  312. lockp := grpcproxy.NewLockProxy(client)
  313. server := grpc.NewServer(
  314. grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
  315. grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
  316. grpc.MaxConcurrentStreams(math.MaxUint32),
  317. )
  318. pb.RegisterKVServer(server, kvp)
  319. pb.RegisterWatchServer(server, watchp)
  320. pb.RegisterClusterServer(server, clusterp)
  321. pb.RegisterLeaseServer(server, leasep)
  322. pb.RegisterMaintenanceServer(server, mainp)
  323. pb.RegisterAuthServer(server, authp)
  324. v3electionpb.RegisterElectionServer(server, electionp)
  325. v3lockpb.RegisterLockServer(server, lockp)
  326. return server
  327. }
  328. func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client) (*http.Server, net.Listener) {
  329. httpClient := mustNewHTTPClient(lg)
  330. httpmux := http.NewServeMux()
  331. httpmux.HandleFunc("/", http.NotFound)
  332. grpcproxy.HandleMetrics(httpmux, httpClient, c.Endpoints())
  333. grpcproxy.HandleHealth(httpmux, c)
  334. if grpcProxyEnablePprof {
  335. for p, h := range debugutil.PProfHandlers() {
  336. httpmux.Handle(p, h)
  337. }
  338. lg.Info("gRPC proxy enabled pprof", zap.String("path", debugutil.HTTPPrefixPProf))
  339. }
  340. srvhttp := &http.Server{
  341. Handler: httpmux,
  342. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  343. }
  344. if tlsinfo == nil {
  345. return srvhttp, m.Match(cmux.HTTP1())
  346. }
  347. srvTLS, err := tlsinfo.ServerConfig()
  348. if err != nil {
  349. lg.Fatal("failed to set up TLS", zap.Error(err))
  350. }
  351. srvhttp.TLSConfig = srvTLS
  352. return srvhttp, m.Match(cmux.Any())
  353. }
  354. func mustNewHTTPClient(lg *zap.Logger) *http.Client {
  355. transport, err := newHTTPTransport(grpcProxyCA, grpcProxyCert, grpcProxyKey)
  356. if err != nil {
  357. fmt.Fprintln(os.Stderr, err)
  358. os.Exit(1)
  359. }
  360. return &http.Client{Transport: transport}
  361. }
  362. func newHTTPTransport(ca, cert, key string) (*http.Transport, error) {
  363. tr := &http.Transport{}
  364. if ca != "" && cert != "" && key != "" {
  365. caCert, err := ioutil.ReadFile(ca)
  366. if err != nil {
  367. return nil, err
  368. }
  369. keyPair, err := tls.LoadX509KeyPair(cert, key)
  370. if err != nil {
  371. return nil, err
  372. }
  373. caPool := x509.NewCertPool()
  374. caPool.AppendCertsFromPEM(caCert)
  375. tlsConfig := &tls.Config{
  376. Certificates: []tls.Certificate{keyPair},
  377. RootCAs: caPool,
  378. }
  379. tlsConfig.BuildNameToCertificate()
  380. tr.TLSClientConfig = tlsConfig
  381. } else if grpcProxyInsecureSkipTLSVerify {
  382. tlsConfig := &tls.Config{InsecureSkipVerify: grpcProxyInsecureSkipTLSVerify}
  383. tr.TLSClientConfig = tlsConfig
  384. }
  385. return tr, nil
  386. }
  387. func mustMetricsListener(lg *zap.Logger, tlsinfo *transport.TLSInfo) net.Listener {
  388. murl, err := url.Parse(grpcProxyMetricsListenAddr)
  389. if err != nil {
  390. fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr)
  391. os.Exit(1)
  392. }
  393. ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsinfo)
  394. if err != nil {
  395. fmt.Fprintln(os.Stderr, err)
  396. os.Exit(1)
  397. }
  398. lg.Info("gRPC proxy listening for metrics", zap.String("address", murl.String()))
  399. return ml
  400. }