grpc_proxy.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdmain
  15. import (
  16. "context"
  17. "fmt"
  18. "io/ioutil"
  19. "math"
  20. "net"
  21. "net/http"
  22. "net/url"
  23. "os"
  24. "path/filepath"
  25. "time"
  26. "github.com/coreos/etcd/clientv3"
  27. "github.com/coreos/etcd/clientv3/leasing"
  28. "github.com/coreos/etcd/clientv3/namespace"
  29. "github.com/coreos/etcd/clientv3/ordering"
  30. "github.com/coreos/etcd/etcdserver/api/etcdhttp"
  31. "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
  32. "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
  33. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  34. "github.com/coreos/etcd/pkg/debugutil"
  35. "github.com/coreos/etcd/pkg/transport"
  36. "github.com/coreos/etcd/proxy/grpcproxy"
  37. "github.com/coreos/pkg/capnslog"
  38. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  39. "github.com/soheilhy/cmux"
  40. "github.com/spf13/cobra"
  41. "google.golang.org/grpc"
  42. "google.golang.org/grpc/grpclog"
  43. )
  44. var (
  45. grpcProxyListenAddr string
  46. grpcProxyMetricsListenAddr string
  47. grpcProxyEndpoints []string
  48. grpcProxyDNSCluster string
  49. grpcProxyInsecureDiscovery bool
  50. grpcProxyDataDir string
  51. grpcMaxCallSendMsgSize int
  52. grpcMaxCallRecvMsgSize int
  53. // tls for connecting to etcd
  54. grpcProxyCA string
  55. grpcProxyCert string
  56. grpcProxyKey string
  57. grpcProxyInsecureSkipTLSVerify bool
  58. // tls for clients connecting to proxy
  59. grpcProxyListenCA string
  60. grpcProxyListenCert string
  61. grpcProxyListenKey string
  62. grpcProxyListenAutoTLS bool
  63. grpcProxyListenCRL string
  64. grpcProxyAdvertiseClientURL string
  65. grpcProxyResolverPrefix string
  66. grpcProxyResolverTTL int
  67. grpcProxyNamespace string
  68. grpcProxyLeasing string
  69. grpcProxyEnablePprof bool
  70. grpcProxyEnableOrdering bool
  71. grpcProxyDebug bool
  72. )
  73. const defaultGRPCMaxCallSendMsgSize = 1.5 * 1024 * 1024
  74. func init() {
  75. rootCmd.AddCommand(newGRPCProxyCommand())
  76. }
  77. // newGRPCProxyCommand returns the cobra command for "grpc-proxy".
  78. func newGRPCProxyCommand() *cobra.Command {
  79. lpc := &cobra.Command{
  80. Use: "grpc-proxy <subcommand>",
  81. Short: "grpc-proxy related command",
  82. }
  83. lpc.AddCommand(newGRPCProxyStartCommand())
  84. return lpc
  85. }
  86. func newGRPCProxyStartCommand() *cobra.Command {
  87. cmd := cobra.Command{
  88. Use: "start",
  89. Short: "start the grpc proxy",
  90. Run: startGRPCProxy,
  91. }
  92. cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
  93. cmd.Flags().StringVar(&grpcProxyDNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")
  94. cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for /metrics requests on an additional interface")
  95. cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
  96. cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
  97. cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
  98. cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)")
  99. cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints")
  100. cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests")
  101. cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`)
  102. cmd.Flags().StringVar(&grpcProxyDataDir, "data-dir", "default.proxy", "Data directory for persistent data")
  103. cmd.Flags().IntVar(&grpcMaxCallSendMsgSize, "max-send-bytes", defaultGRPCMaxCallSendMsgSize, "message send limits in bytes (default value is 1.5 MiB)")
  104. cmd.Flags().IntVar(&grpcMaxCallRecvMsgSize, "max-recv-bytes", math.MaxInt32, "message receive limits in bytes (default value is math.MaxInt32)")
  105. // client TLS for connecting to server
  106. cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
  107. cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
  108. cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
  109. cmd.Flags().BoolVar(&grpcProxyInsecureSkipTLSVerify, "insecure-skip-tls-verify", false, "skip authentication of etcd server TLS certificates")
  110. // client TLS for connecting to proxy
  111. cmd.Flags().StringVar(&grpcProxyListenCert, "cert-file", "", "identify secure connections to the proxy using this TLS certificate file")
  112. cmd.Flags().StringVar(&grpcProxyListenKey, "key-file", "", "identify secure connections to the proxy using this TLS key file")
  113. cmd.Flags().StringVar(&grpcProxyListenCA, "trusted-ca-file", "", "verify certificates of TLS-enabled secure proxy using this CA bundle")
  114. cmd.Flags().BoolVar(&grpcProxyListenAutoTLS, "auto-tls", false, "proxy TLS using generated certificates")
  115. cmd.Flags().StringVar(&grpcProxyListenCRL, "client-crl-file", "", "proxy client certificate revocation list file.")
  116. // experimental flags
  117. cmd.Flags().BoolVar(&grpcProxyEnableOrdering, "experimental-serializable-ordering", false, "Ensure serializable reads have monotonically increasing store revisions across endpoints.")
  118. cmd.Flags().StringVar(&grpcProxyLeasing, "experimental-leasing-prefix", "", "leasing metadata prefix for disconnected linearized reads.")
  119. cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.")
  120. return &cmd
  121. }
  122. func startGRPCProxy(cmd *cobra.Command, args []string) {
  123. checkArgs()
  124. capnslog.SetGlobalLogLevel(capnslog.INFO)
  125. if grpcProxyDebug {
  126. capnslog.SetGlobalLogLevel(capnslog.DEBUG)
  127. grpc.EnableTracing = true
  128. // enable info, warning, error
  129. grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
  130. } else {
  131. // only discard info
  132. grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
  133. }
  134. tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
  135. if tlsinfo == nil && grpcProxyListenAutoTLS {
  136. host := []string{"https://" + grpcProxyListenAddr}
  137. dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy")
  138. autoTLS, err := transport.SelfCert(dir, host)
  139. if err != nil {
  140. plog.Fatal(err)
  141. }
  142. tlsinfo = &autoTLS
  143. }
  144. if tlsinfo != nil {
  145. plog.Infof("ServerTLS: %s", tlsinfo)
  146. }
  147. m := mustListenCMux(tlsinfo)
  148. grpcl := m.Match(cmux.HTTP2())
  149. defer func() {
  150. grpcl.Close()
  151. plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
  152. }()
  153. client := mustNewClient()
  154. srvhttp, httpl := mustHTTPListener(m, tlsinfo, client)
  155. errc := make(chan error)
  156. go func() { errc <- newGRPCProxyServer(client).Serve(grpcl) }()
  157. go func() { errc <- srvhttp.Serve(httpl) }()
  158. go func() { errc <- m.Serve() }()
  159. if len(grpcProxyMetricsListenAddr) > 0 {
  160. mhttpl := mustMetricsListener(tlsinfo)
  161. go func() {
  162. mux := http.NewServeMux()
  163. etcdhttp.HandlePrometheus(mux)
  164. grpcproxy.HandleHealth(mux, client)
  165. plog.Fatal(http.Serve(mhttpl, mux))
  166. }()
  167. }
  168. // grpc-proxy is initialized, ready to serve
  169. notifySystemd()
  170. fmt.Fprintln(os.Stderr, <-errc)
  171. os.Exit(1)
  172. }
  173. func checkArgs() {
  174. if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
  175. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL))
  176. os.Exit(1)
  177. }
  178. if grpcProxyResolverPrefix == "" && grpcProxyResolverTTL > 0 {
  179. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-prefix %q", grpcProxyResolverPrefix))
  180. os.Exit(1)
  181. }
  182. if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL > 0 && grpcProxyAdvertiseClientURL == "" {
  183. fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL))
  184. os.Exit(1)
  185. }
  186. }
  187. func mustNewClient() *clientv3.Client {
  188. srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery)
  189. eps := srvs.Endpoints
  190. if len(eps) == 0 {
  191. eps = grpcProxyEndpoints
  192. }
  193. cfg, err := newClientCfg(eps)
  194. if err != nil {
  195. fmt.Fprintln(os.Stderr, err)
  196. os.Exit(1)
  197. }
  198. cfg.DialOptions = append(cfg.DialOptions,
  199. grpc.WithUnaryInterceptor(grpcproxy.AuthUnaryClientInterceptor))
  200. cfg.DialOptions = append(cfg.DialOptions,
  201. grpc.WithStreamInterceptor(grpcproxy.AuthStreamClientInterceptor))
  202. client, err := clientv3.New(*cfg)
  203. if err != nil {
  204. fmt.Fprintln(os.Stderr, err)
  205. os.Exit(1)
  206. }
  207. return client
  208. }
  209. func newClientCfg(eps []string) (*clientv3.Config, error) {
  210. // set tls if any one tls option set
  211. cfg := clientv3.Config{
  212. Endpoints: eps,
  213. DialTimeout: 5 * time.Second,
  214. }
  215. if grpcMaxCallSendMsgSize > 0 {
  216. cfg.MaxCallSendMsgSize = grpcMaxCallSendMsgSize
  217. }
  218. if grpcMaxCallRecvMsgSize > 0 {
  219. cfg.MaxCallRecvMsgSize = grpcMaxCallRecvMsgSize
  220. }
  221. tls := newTLS(grpcProxyCA, grpcProxyCert, grpcProxyKey)
  222. if tls == nil && grpcProxyInsecureSkipTLSVerify {
  223. tls = &transport.TLSInfo{}
  224. }
  225. if tls != nil {
  226. clientTLS, err := tls.ClientConfig()
  227. if err != nil {
  228. return nil, err
  229. }
  230. clientTLS.InsecureSkipVerify = grpcProxyInsecureSkipTLSVerify
  231. cfg.TLS = clientTLS
  232. plog.Infof("ClientTLS: %s", tls)
  233. }
  234. return &cfg, nil
  235. }
  236. func newTLS(ca, cert, key string) *transport.TLSInfo {
  237. if ca == "" && cert == "" && key == "" {
  238. return nil
  239. }
  240. return &transport.TLSInfo{CAFile: ca, CertFile: cert, KeyFile: key}
  241. }
  242. func mustListenCMux(tlsinfo *transport.TLSInfo) cmux.CMux {
  243. l, err := net.Listen("tcp", grpcProxyListenAddr)
  244. if err != nil {
  245. fmt.Fprintln(os.Stderr, err)
  246. os.Exit(1)
  247. }
  248. if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil {
  249. fmt.Fprintln(os.Stderr, err)
  250. os.Exit(1)
  251. }
  252. if tlsinfo != nil {
  253. tlsinfo.CRLFile = grpcProxyListenCRL
  254. if l, err = transport.NewTLSListener(l, tlsinfo); err != nil {
  255. plog.Fatal(err)
  256. }
  257. }
  258. plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
  259. return cmux.New(l)
  260. }
  261. func newGRPCProxyServer(client *clientv3.Client) *grpc.Server {
  262. if grpcProxyEnableOrdering {
  263. vf := ordering.NewOrderViolationSwitchEndpointClosure(*client)
  264. client.KV = ordering.NewKV(client.KV, vf)
  265. plog.Infof("waiting for linearized read from cluster to recover ordering")
  266. for {
  267. _, err := client.KV.Get(context.TODO(), "_", clientv3.WithKeysOnly())
  268. if err == nil {
  269. break
  270. }
  271. plog.Warningf("ordering recovery failed, retrying in 1s (%v)", err)
  272. time.Sleep(time.Second)
  273. }
  274. }
  275. if len(grpcProxyNamespace) > 0 {
  276. client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
  277. client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
  278. client.Lease = namespace.NewLease(client.Lease, grpcProxyNamespace)
  279. }
  280. if len(grpcProxyLeasing) > 0 {
  281. client.KV, _, _ = leasing.NewKV(client, grpcProxyLeasing)
  282. }
  283. kvp, _ := grpcproxy.NewKvProxy(client)
  284. watchp, _ := grpcproxy.NewWatchProxy(client)
  285. if grpcProxyResolverPrefix != "" {
  286. grpcproxy.Register(client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
  287. }
  288. clusterp, _ := grpcproxy.NewClusterProxy(client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
  289. leasep, _ := grpcproxy.NewLeaseProxy(client)
  290. mainp := grpcproxy.NewMaintenanceProxy(client)
  291. authp := grpcproxy.NewAuthProxy(client)
  292. electionp := grpcproxy.NewElectionProxy(client)
  293. lockp := grpcproxy.NewLockProxy(client)
  294. server := grpc.NewServer(
  295. grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
  296. grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
  297. grpc.MaxConcurrentStreams(math.MaxUint32),
  298. )
  299. pb.RegisterKVServer(server, kvp)
  300. pb.RegisterWatchServer(server, watchp)
  301. pb.RegisterClusterServer(server, clusterp)
  302. pb.RegisterLeaseServer(server, leasep)
  303. pb.RegisterMaintenanceServer(server, mainp)
  304. pb.RegisterAuthServer(server, authp)
  305. v3electionpb.RegisterElectionServer(server, electionp)
  306. v3lockpb.RegisterLockServer(server, lockp)
  307. // set zero values for metrics registered for this grpc server
  308. grpc_prometheus.Register(server)
  309. return server
  310. }
  311. func mustHTTPListener(m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client) (*http.Server, net.Listener) {
  312. httpmux := http.NewServeMux()
  313. httpmux.HandleFunc("/", http.NotFound)
  314. etcdhttp.HandlePrometheus(httpmux)
  315. grpcproxy.HandleHealth(httpmux, c)
  316. if grpcProxyEnablePprof {
  317. for p, h := range debugutil.PProfHandlers() {
  318. httpmux.Handle(p, h)
  319. }
  320. plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf)
  321. }
  322. srvhttp := &http.Server{Handler: httpmux}
  323. if tlsinfo == nil {
  324. return srvhttp, m.Match(cmux.HTTP1())
  325. }
  326. srvTLS, err := tlsinfo.ServerConfig()
  327. if err != nil {
  328. plog.Fatalf("could not setup TLS (%v)", err)
  329. }
  330. srvhttp.TLSConfig = srvTLS
  331. return srvhttp, m.Match(cmux.Any())
  332. }
  333. func mustMetricsListener(tlsinfo *transport.TLSInfo) net.Listener {
  334. murl, err := url.Parse(grpcProxyMetricsListenAddr)
  335. if err != nil {
  336. fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr)
  337. os.Exit(1)
  338. }
  339. ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsinfo)
  340. if err != nil {
  341. fmt.Fprintln(os.Stderr, err)
  342. os.Exit(1)
  343. }
  344. plog.Info("grpc-proxy: listening for metrics on ", murl.String())
  345. return ml
  346. }