grpc_proxy.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdmain
  15. import (
  16. "context"
  17. "fmt"
  18. "io/ioutil"
  19. "log"
  20. "math"
  21. "net"
  22. "net/http"
  23. "net/url"
  24. "os"
  25. "path/filepath"
  26. "time"
  27. "go.etcd.io/etcd/clientv3"
  28. "go.etcd.io/etcd/clientv3/leasing"
  29. "go.etcd.io/etcd/clientv3/namespace"
  30. "go.etcd.io/etcd/clientv3/ordering"
  31. "go.etcd.io/etcd/etcdserver/api/etcdhttp"
  32. "go.etcd.io/etcd/etcdserver/api/v3election/v3electionpb"
  33. "go.etcd.io/etcd/etcdserver/api/v3lock/v3lockpb"
  34. pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
  35. "go.etcd.io/etcd/pkg/debugutil"
  36. "go.etcd.io/etcd/pkg/logutil"
  37. "go.etcd.io/etcd/pkg/transport"
  38. "go.etcd.io/etcd/proxy/grpcproxy"
  39. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  40. "github.com/soheilhy/cmux"
  41. "github.com/spf13/cobra"
  42. "go.uber.org/zap"
  43. "google.golang.org/grpc"
  44. "google.golang.org/grpc/grpclog"
  45. )
  46. var (
  47. grpcProxyListenAddr string
  48. grpcProxyMetricsListenAddr string
  49. grpcProxyEndpoints []string
  50. grpcProxyDNSCluster string
  51. grpcProxyDNSClusterServiceName string
  52. grpcProxyInsecureDiscovery bool
  53. grpcProxyDataDir string
  54. grpcMaxCallSendMsgSize int
  55. grpcMaxCallRecvMsgSize int
  56. // tls for connecting to etcd
  57. grpcProxyCA string
  58. grpcProxyCert string
  59. grpcProxyKey string
  60. grpcProxyInsecureSkipTLSVerify bool
  61. // tls for clients connecting to proxy
  62. grpcProxyListenCA string
  63. grpcProxyListenCert string
  64. grpcProxyListenKey string
  65. grpcProxyListenAutoTLS bool
  66. grpcProxyListenCRL string
  67. grpcProxyAdvertiseClientURL string
  68. grpcProxyResolverPrefix string
  69. grpcProxyResolverTTL int
  70. grpcProxyNamespace string
  71. grpcProxyLeasing string
  72. grpcProxyEnablePprof bool
  73. grpcProxyEnableOrdering bool
  74. grpcProxyDebug bool
  75. )
  76. const defaultGRPCMaxCallSendMsgSize = 1.5 * 1024 * 1024
  77. func init() {
  78. rootCmd.AddCommand(newGRPCProxyCommand())
  79. }
  80. // newGRPCProxyCommand returns the cobra command for "grpc-proxy".
  81. func newGRPCProxyCommand() *cobra.Command {
  82. lpc := &cobra.Command{
  83. Use: "grpc-proxy <subcommand>",
  84. Short: "grpc-proxy related command",
  85. }
  86. lpc.AddCommand(newGRPCProxyStartCommand())
  87. return lpc
  88. }
  89. func newGRPCProxyStartCommand() *cobra.Command {
  90. cmd := cobra.Command{
  91. Use: "start",
  92. Short: "start the grpc proxy",
  93. Run: startGRPCProxy,
  94. }
  95. cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
  96. cmd.Flags().StringVar(&grpcProxyDNSCluster, "discovery-srv", "", "domain name to query for SRV records describing cluster endpoints")
  97. cmd.Flags().StringVar(&grpcProxyDNSClusterServiceName, "discovery-srv-name", "", "service name to query when using DNS discovery")
  98. cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for /metrics requests on an additional interface")
  99. cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
  100. cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
  101. cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
  102. cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)")
  103. cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints")
  104. cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests")
  105. cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`)
  106. cmd.Flags().StringVar(&grpcProxyDataDir, "data-dir", "default.proxy", "Data directory for persistent data")
  107. cmd.Flags().IntVar(&grpcMaxCallSendMsgSize, "max-send-bytes", defaultGRPCMaxCallSendMsgSize, "message send limits in bytes (default value is 1.5 MiB)")
  108. cmd.Flags().IntVar(&grpcMaxCallRecvMsgSize, "max-recv-bytes", math.MaxInt32, "message receive limits in bytes (default value is math.MaxInt32)")
  109. // client TLS for connecting to server
  110. cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
  111. cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
  112. cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
  113. cmd.Flags().BoolVar(&grpcProxyInsecureSkipTLSVerify, "insecure-skip-tls-verify", false, "skip authentication of etcd server TLS certificates")
  114. // client TLS for connecting to proxy
  115. cmd.Flags().StringVar(&grpcProxyListenCert, "cert-file", "", "identify secure connections to the proxy using this TLS certificate file")
  116. cmd.Flags().StringVar(&grpcProxyListenKey, "key-file", "", "identify secure connections to the proxy using this TLS key file")
  117. cmd.Flags().StringVar(&grpcProxyListenCA, "trusted-ca-file", "", "verify certificates of TLS-enabled secure proxy using this CA bundle")
  118. cmd.Flags().BoolVar(&grpcProxyListenAutoTLS, "auto-tls", false, "proxy TLS using generated certificates")
  119. cmd.Flags().StringVar(&grpcProxyListenCRL, "client-crl-file", "", "proxy client certificate revocation list file.")
  120. // experimental flags
  121. cmd.Flags().BoolVar(&grpcProxyEnableOrdering, "experimental-serializable-ordering", false, "Ensure serializable reads have monotonically increasing store revisions across endpoints.")
  122. cmd.Flags().StringVar(&grpcProxyLeasing, "experimental-leasing-prefix", "", "leasing metadata prefix for disconnected linearized reads.")
  123. cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.")
  124. return &cmd
  125. }
  126. func startGRPCProxy(cmd *cobra.Command, args []string) {
  127. checkArgs()
  128. lcfg := zap.Config{
  129. Level: zap.NewAtomicLevelAt(zap.InfoLevel),
  130. Development: false,
  131. Sampling: &zap.SamplingConfig{
  132. Initial: 100,
  133. Thereafter: 100,
  134. },
  135. Encoding: "json",
  136. EncoderConfig: zap.NewProductionEncoderConfig(),
  137. OutputPaths: []string{"stderr"},
  138. ErrorOutputPaths: []string{"stderr"},
  139. }
  140. if grpcProxyDebug {
  141. lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
  142. grpc.EnableTracing = true
  143. }
  144. lg, err := lcfg.Build()
  145. if err != nil {
  146. log.Fatal(err)
  147. }
  148. defer lg.Sync()
  149. var gl grpclog.LoggerV2
  150. gl, err = logutil.NewGRPCLoggerV2(lcfg)
  151. if err != nil {
  152. log.Fatal(err)
  153. }
  154. grpclog.SetLoggerV2(gl)
  155. tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
  156. if tlsinfo == nil && grpcProxyListenAutoTLS {
  157. host := []string{"https://" + grpcProxyListenAddr}
  158. dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy")
  159. autoTLS, err := transport.SelfCert(lg, dir, host)
  160. if err != nil {
  161. log.Fatal(err)
  162. }
  163. tlsinfo = &autoTLS
  164. }
  165. if tlsinfo != nil {
  166. lg.Info("gRPC proxy server TLS", zap.String("tls-info", fmt.Sprintf("%+v", tlsinfo)))
  167. }
  168. m := mustListenCMux(lg, tlsinfo)
  169. grpcl := m.Match(cmux.HTTP2())
  170. defer func() {
  171. grpcl.Close()
  172. lg.Info("stop listening gRPC proxy client requests", zap.String("address", grpcProxyListenAddr))
  173. }()
  174. client := mustNewClient(lg)
  175. srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client)
  176. errc := make(chan error)
  177. go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }()
  178. go func() { errc <- srvhttp.Serve(httpl) }()
  179. go func() { errc <- m.Serve() }()
  180. if len(grpcProxyMetricsListenAddr) > 0 {
  181. mhttpl := mustMetricsListener(lg, tlsinfo)
  182. go func() {
  183. mux := http.NewServeMux()
  184. etcdhttp.HandlePrometheus(mux)
  185. grpcproxy.HandleHealth(mux, client)
  186. lg.Info("gRPC proxy server metrics URL serving")
  187. herr := http.Serve(mhttpl, mux)
  188. if herr != nil {
  189. lg.Fatal("gRPC proxy server metrics URL returned", zap.Error(herr))
  190. } else {
  191. lg.Info("gRPC proxy server metrics URL returned")
  192. }
  193. }()
  194. }
  195. lg.Info("started gRPC proxy", zap.String("address", grpcProxyListenAddr))
  196. // grpc-proxy is initialized, ready to serve
  197. notifySystemd(lg)
  198. fmt.Fprintln(os.Stderr, <-errc)
  199. os.Exit(1)
  200. }
  201. func checkArgs() {
  202. if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
  203. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL))
  204. os.Exit(1)
  205. }
  206. if grpcProxyResolverPrefix == "" && grpcProxyResolverTTL > 0 {
  207. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-prefix %q", grpcProxyResolverPrefix))
  208. os.Exit(1)
  209. }
  210. if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL > 0 && grpcProxyAdvertiseClientURL == "" {
  211. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL))
  212. os.Exit(1)
  213. }
  214. }
  215. func mustNewClient(lg *zap.Logger) *clientv3.Client {
  216. srvs := discoverEndpoints(lg, grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery, grpcProxyDNSClusterServiceName)
  217. eps := srvs.Endpoints
  218. if len(eps) == 0 {
  219. eps = grpcProxyEndpoints
  220. }
  221. cfg, err := newClientCfg(lg, eps)
  222. if err != nil {
  223. fmt.Fprintln(os.Stderr, err)
  224. os.Exit(1)
  225. }
  226. cfg.DialOptions = append(cfg.DialOptions,
  227. grpc.WithUnaryInterceptor(grpcproxy.AuthUnaryClientInterceptor))
  228. cfg.DialOptions = append(cfg.DialOptions,
  229. grpc.WithStreamInterceptor(grpcproxy.AuthStreamClientInterceptor))
  230. client, err := clientv3.New(*cfg)
  231. if err != nil {
  232. fmt.Fprintln(os.Stderr, err)
  233. os.Exit(1)
  234. }
  235. return client
  236. }
  237. func newClientCfg(lg *zap.Logger, eps []string) (*clientv3.Config, error) {
  238. // set tls if any one tls option set
  239. cfg := clientv3.Config{
  240. Endpoints: eps,
  241. DialTimeout: 5 * time.Second,
  242. }
  243. if grpcMaxCallSendMsgSize > 0 {
  244. cfg.MaxCallSendMsgSize = grpcMaxCallSendMsgSize
  245. }
  246. if grpcMaxCallRecvMsgSize > 0 {
  247. cfg.MaxCallRecvMsgSize = grpcMaxCallRecvMsgSize
  248. }
  249. tls := newTLS(grpcProxyCA, grpcProxyCert, grpcProxyKey)
  250. if tls == nil && grpcProxyInsecureSkipTLSVerify {
  251. tls = &transport.TLSInfo{}
  252. }
  253. if tls != nil {
  254. clientTLS, err := tls.ClientConfig()
  255. if err != nil {
  256. return nil, err
  257. }
  258. clientTLS.InsecureSkipVerify = grpcProxyInsecureSkipTLSVerify
  259. cfg.TLS = clientTLS
  260. lg.Info("gRPC proxy client TLS", zap.String("tls-info", fmt.Sprintf("%+v", tls)))
  261. }
  262. return &cfg, nil
  263. }
  264. func newTLS(ca, cert, key string) *transport.TLSInfo {
  265. if ca == "" && cert == "" && key == "" {
  266. return nil
  267. }
  268. return &transport.TLSInfo{TrustedCAFile: ca, CertFile: cert, KeyFile: key}
  269. }
  270. func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux {
  271. l, err := net.Listen("tcp", grpcProxyListenAddr)
  272. if err != nil {
  273. fmt.Fprintln(os.Stderr, err)
  274. os.Exit(1)
  275. }
  276. if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil {
  277. fmt.Fprintln(os.Stderr, err)
  278. os.Exit(1)
  279. }
  280. if tlsinfo != nil {
  281. tlsinfo.CRLFile = grpcProxyListenCRL
  282. if l, err = transport.NewTLSListener(l, tlsinfo); err != nil {
  283. lg.Fatal("failed to create TLS listener", zap.Error(err))
  284. }
  285. }
  286. lg.Info("listening for gRPC proxy client requests", zap.String("address", grpcProxyListenAddr))
  287. return cmux.New(l)
  288. }
  289. func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
  290. if grpcProxyEnableOrdering {
  291. vf := ordering.NewOrderViolationSwitchEndpointClosure(*client)
  292. client.KV = ordering.NewKV(client.KV, vf)
  293. lg.Info("waiting for linearized read from cluster to recover ordering")
  294. for {
  295. _, err := client.KV.Get(context.TODO(), "_", clientv3.WithKeysOnly())
  296. if err == nil {
  297. break
  298. }
  299. lg.Warn("ordering recovery failed, retrying in 1s", zap.Error(err))
  300. time.Sleep(time.Second)
  301. }
  302. }
  303. if len(grpcProxyNamespace) > 0 {
  304. client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
  305. client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
  306. client.Lease = namespace.NewLease(client.Lease, grpcProxyNamespace)
  307. }
  308. if len(grpcProxyLeasing) > 0 {
  309. client.KV, _, _ = leasing.NewKV(client, grpcProxyLeasing)
  310. }
  311. kvp, _ := grpcproxy.NewKvProxy(client)
  312. watchp, _ := grpcproxy.NewWatchProxy(client)
  313. if grpcProxyResolverPrefix != "" {
  314. grpcproxy.Register(client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
  315. }
  316. clusterp, _ := grpcproxy.NewClusterProxy(client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
  317. leasep, _ := grpcproxy.NewLeaseProxy(client)
  318. mainp := grpcproxy.NewMaintenanceProxy(client)
  319. authp := grpcproxy.NewAuthProxy(client)
  320. electionp := grpcproxy.NewElectionProxy(client)
  321. lockp := grpcproxy.NewLockProxy(client)
  322. server := grpc.NewServer(
  323. grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
  324. grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
  325. grpc.MaxConcurrentStreams(math.MaxUint32),
  326. )
  327. pb.RegisterKVServer(server, kvp)
  328. pb.RegisterWatchServer(server, watchp)
  329. pb.RegisterClusterServer(server, clusterp)
  330. pb.RegisterLeaseServer(server, leasep)
  331. pb.RegisterMaintenanceServer(server, mainp)
  332. pb.RegisterAuthServer(server, authp)
  333. v3electionpb.RegisterElectionServer(server, electionp)
  334. v3lockpb.RegisterLockServer(server, lockp)
  335. // set zero values for metrics registered for this grpc server
  336. grpc_prometheus.Register(server)
  337. return server
  338. }
  339. func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client) (*http.Server, net.Listener) {
  340. httpmux := http.NewServeMux()
  341. httpmux.HandleFunc("/", http.NotFound)
  342. etcdhttp.HandlePrometheus(httpmux)
  343. grpcproxy.HandleHealth(httpmux, c)
  344. if grpcProxyEnablePprof {
  345. for p, h := range debugutil.PProfHandlers() {
  346. httpmux.Handle(p, h)
  347. }
  348. lg.Info("gRPC proxy enabled pprof", zap.String("path", debugutil.HTTPPrefixPProf))
  349. }
  350. srvhttp := &http.Server{
  351. Handler: httpmux,
  352. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  353. }
  354. if tlsinfo == nil {
  355. return srvhttp, m.Match(cmux.HTTP1())
  356. }
  357. srvTLS, err := tlsinfo.ServerConfig()
  358. if err != nil {
  359. lg.Fatal("failed to set up TLS", zap.Error(err))
  360. }
  361. srvhttp.TLSConfig = srvTLS
  362. return srvhttp, m.Match(cmux.Any())
  363. }
  364. func mustMetricsListener(lg *zap.Logger, tlsinfo *transport.TLSInfo) net.Listener {
  365. murl, err := url.Parse(grpcProxyMetricsListenAddr)
  366. if err != nil {
  367. fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr)
  368. os.Exit(1)
  369. }
  370. ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsinfo)
  371. if err != nil {
  372. fmt.Fprintln(os.Stderr, err)
  373. os.Exit(1)
  374. }
  375. lg.Info("gRPC proxy listening for metrics", zap.String("address", murl.String()))
  376. return ml
  377. }