grpc_proxy.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdmain
  15. import (
  16. "fmt"
  17. "net"
  18. "net/http"
  19. "os"
  20. "time"
  21. "github.com/coreos/etcd/clientv3"
  22. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  23. "github.com/coreos/etcd/pkg/transport"
  24. "github.com/coreos/etcd/proxy/grpcproxy"
  25. "github.com/spf13/cobra"
  26. "google.golang.org/grpc"
  27. "github.com/cockroachdb/cmux"
  28. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  29. "github.com/prometheus/client_golang/prometheus"
  30. )
  31. var (
  32. grpcProxyListenAddr string
  33. grpcProxyEndpoints []string
  34. grpcProxyCert string
  35. grpcProxyKey string
  36. grpcProxyCA string
  37. )
  38. func init() {
  39. rootCmd.AddCommand(newGRPCProxyCommand())
  40. }
  41. // newGRPCProxyCommand returns the cobra command for "grpc-proxy".
  42. func newGRPCProxyCommand() *cobra.Command {
  43. lpc := &cobra.Command{
  44. Use: "grpc-proxy <subcommand>",
  45. Short: "grpc-proxy related command",
  46. }
  47. lpc.AddCommand(newGRPCProxyStartCommand())
  48. return lpc
  49. }
  50. func newGRPCProxyStartCommand() *cobra.Command {
  51. cmd := cobra.Command{
  52. Use: "start",
  53. Short: "start the grpc proxy",
  54. Run: startGRPCProxy,
  55. }
  56. cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
  57. cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
  58. cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
  59. cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
  60. cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
  61. return &cmd
  62. }
  63. func startGRPCProxy(cmd *cobra.Command, args []string) {
  64. l, err := net.Listen("tcp", grpcProxyListenAddr)
  65. if err != nil {
  66. fmt.Fprintln(os.Stderr, err)
  67. os.Exit(1)
  68. }
  69. if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil {
  70. fmt.Fprintln(os.Stderr, err)
  71. os.Exit(1)
  72. }
  73. plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
  74. defer func() {
  75. l.Close()
  76. plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
  77. }()
  78. m := cmux.New(l)
  79. cfg, err := newClientCfg()
  80. if err != nil {
  81. fmt.Fprintln(os.Stderr, err)
  82. os.Exit(1)
  83. }
  84. client, err := clientv3.New(*cfg)
  85. if err != nil {
  86. fmt.Fprintln(os.Stderr, err)
  87. os.Exit(1)
  88. }
  89. kvp := grpcproxy.NewKvProxy(client)
  90. watchp := grpcproxy.NewWatchProxy(client)
  91. clusterp := grpcproxy.NewClusterProxy(client)
  92. leasep := grpcproxy.NewLeaseProxy(client)
  93. mainp := grpcproxy.NewMaintenanceProxy(client)
  94. authp := grpcproxy.NewAuthProxy(client)
  95. server := grpc.NewServer(
  96. grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
  97. grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
  98. )
  99. pb.RegisterKVServer(server, kvp)
  100. pb.RegisterWatchServer(server, watchp)
  101. pb.RegisterClusterServer(server, clusterp)
  102. pb.RegisterLeaseServer(server, leasep)
  103. pb.RegisterMaintenanceServer(server, mainp)
  104. pb.RegisterAuthServer(server, authp)
  105. grpc_prometheus.Register(server)
  106. errc := make(chan error)
  107. grpcl := m.Match(cmux.HTTP2())
  108. go func() { errc <- server.Serve(grpcl) }()
  109. httpmux := http.NewServeMux()
  110. httpmux.HandleFunc("/", http.NotFound)
  111. httpmux.Handle("/metrics", prometheus.Handler())
  112. srvhttp := &http.Server{
  113. Handler: httpmux,
  114. }
  115. httpl := m.Match(cmux.HTTP1())
  116. go func() { errc <- srvhttp.Serve(httpl) }()
  117. go func() { errc <- m.Serve() }()
  118. fmt.Fprintln(os.Stderr, <-errc)
  119. os.Exit(1)
  120. }
  121. func newClientCfg() (*clientv3.Config, error) {
  122. // set tls if any one tls option set
  123. var cfgtls *transport.TLSInfo
  124. tlsinfo := transport.TLSInfo{}
  125. if grpcProxyCert != "" {
  126. tlsinfo.CertFile = grpcProxyCert
  127. cfgtls = &tlsinfo
  128. }
  129. if grpcProxyKey != "" {
  130. tlsinfo.KeyFile = grpcProxyKey
  131. cfgtls = &tlsinfo
  132. }
  133. if grpcProxyCA != "" {
  134. tlsinfo.CAFile = grpcProxyCA
  135. cfgtls = &tlsinfo
  136. }
  137. cfg := clientv3.Config{
  138. Endpoints: grpcProxyEndpoints,
  139. DialTimeout: 5 * time.Second,
  140. }
  141. if cfgtls != nil {
  142. clientTLS, err := cfgtls.ClientConfig()
  143. if err != nil {
  144. return nil, err
  145. }
  146. cfg.TLS = clientTLS
  147. }
  148. // TODO: support insecure tls
  149. return &cfg, nil
  150. }