serve.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package embed
  15. import (
  16. "context"
  17. "fmt"
  18. "io/ioutil"
  19. defaultLog "log"
  20. "net"
  21. "net/http"
  22. "strings"
  23. "go.etcd.io/etcd/clientv3/credentials"
  24. "go.etcd.io/etcd/etcdserver"
  25. "go.etcd.io/etcd/etcdserver/api/v3client"
  26. "go.etcd.io/etcd/etcdserver/api/v3election"
  27. "go.etcd.io/etcd/etcdserver/api/v3election/v3electionpb"
  28. v3electiongw "go.etcd.io/etcd/etcdserver/api/v3election/v3electionpb/gw"
  29. "go.etcd.io/etcd/etcdserver/api/v3lock"
  30. "go.etcd.io/etcd/etcdserver/api/v3lock/v3lockpb"
  31. v3lockgw "go.etcd.io/etcd/etcdserver/api/v3lock/v3lockpb/gw"
  32. "go.etcd.io/etcd/etcdserver/api/v3rpc"
  33. etcdservergw "go.etcd.io/etcd/etcdserver/etcdserverpb/gw"
  34. "go.etcd.io/etcd/pkg/debugutil"
  35. "go.etcd.io/etcd/pkg/httputil"
  36. "go.etcd.io/etcd/pkg/transport"
  37. gw "github.com/grpc-ecosystem/grpc-gateway/runtime"
  38. "github.com/soheilhy/cmux"
  39. "github.com/tmc/grpc-websocket-proxy/wsproxy"
  40. "go.uber.org/zap"
  41. "golang.org/x/net/trace"
  42. "google.golang.org/grpc"
  43. )
  44. type serveCtx struct {
  45. lg *zap.Logger
  46. l net.Listener
  47. addr string
  48. network string
  49. secure bool
  50. insecure bool
  51. ctx context.Context
  52. cancel context.CancelFunc
  53. userHandlers map[string]http.Handler
  54. serviceRegister func(*grpc.Server)
  55. serversC chan *servers
  56. }
  57. type servers struct {
  58. secure bool
  59. grpc *grpc.Server
  60. http *http.Server
  61. }
  62. func newServeCtx(lg *zap.Logger) *serveCtx {
  63. ctx, cancel := context.WithCancel(context.Background())
  64. return &serveCtx{
  65. lg: lg,
  66. ctx: ctx,
  67. cancel: cancel,
  68. userHandlers: make(map[string]http.Handler),
  69. serversC: make(chan *servers, 2), // in case sctx.insecure,sctx.secure true
  70. }
  71. }
  72. // serve accepts incoming connections on the listener l,
  73. // creating a new service goroutine for each. The service goroutines
  74. // read requests and then call handler to reply to them.
  75. func (sctx *serveCtx) serve(
  76. s *etcdserver.EtcdServer,
  77. tlsinfo *transport.TLSInfo,
  78. handler http.Handler,
  79. errHandler func(error),
  80. gopts ...grpc.ServerOption) (err error) {
  81. logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
  82. <-s.ReadyNotify()
  83. if sctx.lg == nil {
  84. plog.Info("ready to serve client requests")
  85. }
  86. m := cmux.New(sctx.l)
  87. v3c := v3client.New(s)
  88. servElection := v3election.NewElectionServer(v3c)
  89. servLock := v3lock.NewLockServer(v3c)
  90. var gs *grpc.Server
  91. defer func() {
  92. if err != nil && gs != nil {
  93. gs.Stop()
  94. }
  95. }()
  96. if sctx.insecure {
  97. gs = v3rpc.Server(s, nil, gopts...)
  98. v3electionpb.RegisterElectionServer(gs, servElection)
  99. v3lockpb.RegisterLockServer(gs, servLock)
  100. if sctx.serviceRegister != nil {
  101. sctx.serviceRegister(gs)
  102. }
  103. grpcl := m.Match(cmux.HTTP2())
  104. go func() { errHandler(gs.Serve(grpcl)) }()
  105. var gwmux *gw.ServeMux
  106. if s.Cfg.EnableGRPCGateway {
  107. gwmux, err = sctx.registerGateway([]grpc.DialOption{grpc.WithInsecure()})
  108. if err != nil {
  109. return err
  110. }
  111. }
  112. httpmux := sctx.createMux(gwmux, handler)
  113. srvhttp := &http.Server{
  114. Handler: createAccessController(sctx.lg, s, httpmux),
  115. ErrorLog: logger, // do not log user error
  116. }
  117. httpl := m.Match(cmux.HTTP1())
  118. go func() { errHandler(srvhttp.Serve(httpl)) }()
  119. sctx.serversC <- &servers{grpc: gs, http: srvhttp}
  120. if sctx.lg != nil {
  121. sctx.lg.Info(
  122. "serving client traffic insecurely; this is strongly discouraged!",
  123. zap.String("address", sctx.l.Addr().String()),
  124. )
  125. } else {
  126. plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.l.Addr().String())
  127. }
  128. }
  129. if sctx.secure {
  130. tlscfg, tlsErr := tlsinfo.ServerConfig()
  131. if tlsErr != nil {
  132. return tlsErr
  133. }
  134. gs = v3rpc.Server(s, tlscfg, gopts...)
  135. v3electionpb.RegisterElectionServer(gs, servElection)
  136. v3lockpb.RegisterLockServer(gs, servLock)
  137. if sctx.serviceRegister != nil {
  138. sctx.serviceRegister(gs)
  139. }
  140. handler = grpcHandlerFunc(gs, handler)
  141. var gwmux *gw.ServeMux
  142. if s.Cfg.EnableGRPCGateway {
  143. dtls := tlscfg.Clone()
  144. // trust local server
  145. dtls.InsecureSkipVerify = true
  146. bundle := credentials.NewBundle(credentials.Config{TLSConfig: dtls})
  147. opts := []grpc.DialOption{grpc.WithTransportCredentials(bundle.TransportCredentials())}
  148. gwmux, err = sctx.registerGateway(opts)
  149. if err != nil {
  150. return err
  151. }
  152. }
  153. var tlsl net.Listener
  154. tlsl, err = transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
  155. if err != nil {
  156. return err
  157. }
  158. // TODO: add debug flag; enable logging when debug flag is set
  159. httpmux := sctx.createMux(gwmux, handler)
  160. srv := &http.Server{
  161. Handler: createAccessController(sctx.lg, s, httpmux),
  162. TLSConfig: tlscfg,
  163. ErrorLog: logger, // do not log user error
  164. }
  165. go func() { errHandler(srv.Serve(tlsl)) }()
  166. sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
  167. if sctx.lg != nil {
  168. sctx.lg.Info(
  169. "serving client traffic securely",
  170. zap.String("address", sctx.l.Addr().String()),
  171. )
  172. } else {
  173. plog.Infof("serving client requests on %s", sctx.l.Addr().String())
  174. }
  175. }
  176. close(sctx.serversC)
  177. return m.Serve()
  178. }
  179. // grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
  180. // connections or otherHandler otherwise. Given in gRPC docs.
  181. func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
  182. if otherHandler == nil {
  183. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  184. grpcServer.ServeHTTP(w, r)
  185. })
  186. }
  187. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  188. if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
  189. grpcServer.ServeHTTP(w, r)
  190. } else {
  191. otherHandler.ServeHTTP(w, r)
  192. }
  193. })
  194. }
  195. type registerHandlerFunc func(context.Context, *gw.ServeMux, *grpc.ClientConn) error
  196. func (sctx *serveCtx) registerGateway(opts []grpc.DialOption) (*gw.ServeMux, error) {
  197. ctx := sctx.ctx
  198. addr := sctx.addr
  199. if network := sctx.network; network == "unix" {
  200. // explicitly define unix network for gRPC socket support
  201. addr = fmt.Sprintf("%s://%s", network, addr)
  202. }
  203. conn, err := grpc.DialContext(ctx, addr, opts...)
  204. if err != nil {
  205. return nil, err
  206. }
  207. gwmux := gw.NewServeMux()
  208. handlers := []registerHandlerFunc{
  209. etcdservergw.RegisterKVHandler,
  210. etcdservergw.RegisterWatchHandler,
  211. etcdservergw.RegisterLeaseHandler,
  212. etcdservergw.RegisterClusterHandler,
  213. etcdservergw.RegisterMaintenanceHandler,
  214. etcdservergw.RegisterAuthHandler,
  215. v3lockgw.RegisterLockHandler,
  216. v3electiongw.RegisterElectionHandler,
  217. }
  218. for _, h := range handlers {
  219. if err := h(ctx, gwmux, conn); err != nil {
  220. return nil, err
  221. }
  222. }
  223. go func() {
  224. <-ctx.Done()
  225. if cerr := conn.Close(); cerr != nil {
  226. if sctx.lg != nil {
  227. sctx.lg.Warn(
  228. "failed to close connection",
  229. zap.String("address", sctx.l.Addr().String()),
  230. zap.Error(cerr),
  231. )
  232. } else {
  233. plog.Warningf("failed to close conn to %s: %v", sctx.l.Addr().String(), cerr)
  234. }
  235. }
  236. }()
  237. return gwmux, nil
  238. }
  239. func (sctx *serveCtx) createMux(gwmux *gw.ServeMux, handler http.Handler) *http.ServeMux {
  240. httpmux := http.NewServeMux()
  241. for path, h := range sctx.userHandlers {
  242. httpmux.Handle(path, h)
  243. }
  244. if gwmux != nil {
  245. httpmux.Handle(
  246. "/v3/",
  247. wsproxy.WebsocketProxy(
  248. gwmux,
  249. wsproxy.WithRequestMutator(
  250. // Default to the POST method for streams
  251. func(_ *http.Request, outgoing *http.Request) *http.Request {
  252. outgoing.Method = "POST"
  253. return outgoing
  254. },
  255. ),
  256. ),
  257. )
  258. }
  259. if handler != nil {
  260. httpmux.Handle("/", handler)
  261. }
  262. return httpmux
  263. }
  264. // createAccessController wraps HTTP multiplexer:
  265. // - mutate gRPC gateway request paths
  266. // - check hostname whitelist
  267. // client HTTP requests goes here first
  268. func createAccessController(lg *zap.Logger, s *etcdserver.EtcdServer, mux *http.ServeMux) http.Handler {
  269. return &accessController{lg: lg, s: s, mux: mux}
  270. }
  271. type accessController struct {
  272. lg *zap.Logger
  273. s *etcdserver.EtcdServer
  274. mux *http.ServeMux
  275. }
  276. func (ac *accessController) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
  277. // redirect for backward compatibilities
  278. if req != nil && req.URL != nil && strings.HasPrefix(req.URL.Path, "/v3beta/") {
  279. req.URL.Path = strings.Replace(req.URL.Path, "/v3beta/", "/v3/", 1)
  280. }
  281. if req.TLS == nil { // check origin if client connection is not secure
  282. host := httputil.GetHostname(req)
  283. if !ac.s.AccessController.IsHostWhitelisted(host) {
  284. if ac.lg != nil {
  285. ac.lg.Warn(
  286. "rejecting HTTP request to prevent DNS rebinding attacks",
  287. zap.String("host", host),
  288. )
  289. } else {
  290. plog.Warningf("rejecting HTTP request from %q to prevent DNS rebinding attacks", host)
  291. }
  292. // TODO: use Go's "http.StatusMisdirectedRequest" (421)
  293. // https://github.com/golang/go/commit/4b8a7eafef039af1834ef9bfa879257c4a72b7b5
  294. http.Error(rw, errCVE20185702(host), 421)
  295. return
  296. }
  297. } else if ac.s.Cfg.ClientCertAuthEnabled && ac.s.Cfg.EnableGRPCGateway &&
  298. ac.s.AuthStore().IsAuthEnabled() && strings.HasPrefix(req.URL.Path, "/v3/") {
  299. for _, chains := range req.TLS.VerifiedChains {
  300. if len(chains) < 1 {
  301. continue
  302. }
  303. if len(chains[0].Subject.CommonName) != 0 {
  304. http.Error(rw, "CommonName of client sending a request against gateway will be ignored and not used as expected", 400)
  305. return
  306. }
  307. }
  308. }
  309. // Write CORS header.
  310. if ac.s.AccessController.OriginAllowed("*") {
  311. addCORSHeader(rw, "*")
  312. } else if origin := req.Header.Get("Origin"); ac.s.OriginAllowed(origin) {
  313. addCORSHeader(rw, origin)
  314. }
  315. if req.Method == "OPTIONS" {
  316. rw.WriteHeader(http.StatusOK)
  317. return
  318. }
  319. ac.mux.ServeHTTP(rw, req)
  320. }
  321. // addCORSHeader adds the correct cors headers given an origin
  322. func addCORSHeader(w http.ResponseWriter, origin string) {
  323. w.Header().Add("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
  324. w.Header().Add("Access-Control-Allow-Origin", origin)
  325. w.Header().Add("Access-Control-Allow-Headers", "accept, content-type, authorization")
  326. }
  327. // https://github.com/transmission/transmission/pull/468
  328. func errCVE20185702(host string) string {
  329. return fmt.Sprintf(`
  330. etcd received your request, but the Host header was unrecognized.
  331. To fix this, choose one of the following options:
  332. - Enable TLS, then any HTTPS request will be allowed.
  333. - Add the hostname you want to use to the whitelist in settings.
  334. - e.g. etcd --host-whitelist %q
  335. This requirement has been added to help prevent "DNS Rebinding" attacks (CVE-2018-5702).
  336. `, host)
  337. }
  338. // WrapCORS wraps existing handler with CORS.
  339. // TODO: deprecate this after v2 proxy deprecate
  340. func WrapCORS(cors map[string]struct{}, h http.Handler) http.Handler {
  341. return &corsHandler{
  342. ac: &etcdserver.AccessController{CORS: cors},
  343. h: h,
  344. }
  345. }
  346. type corsHandler struct {
  347. ac *etcdserver.AccessController
  348. h http.Handler
  349. }
  350. func (ch *corsHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
  351. if ch.ac.OriginAllowed("*") {
  352. addCORSHeader(rw, "*")
  353. } else if origin := req.Header.Get("Origin"); ch.ac.OriginAllowed(origin) {
  354. addCORSHeader(rw, origin)
  355. }
  356. if req.Method == "OPTIONS" {
  357. rw.WriteHeader(http.StatusOK)
  358. return
  359. }
  360. ch.h.ServeHTTP(rw, req)
  361. }
  362. func (sctx *serveCtx) registerUserHandler(s string, h http.Handler) {
  363. if sctx.userHandlers[s] != nil {
  364. if sctx.lg != nil {
  365. sctx.lg.Warn("path is already registered by user handler", zap.String("path", s))
  366. } else {
  367. plog.Warningf("path %s already registered by user handler", s)
  368. }
  369. return
  370. }
  371. sctx.userHandlers[s] = h
  372. }
  373. func (sctx *serveCtx) registerPprof() {
  374. for p, h := range debugutil.PProfHandlers() {
  375. sctx.registerUserHandler(p, h)
  376. }
  377. }
  378. func (sctx *serveCtx) registerTrace() {
  379. reqf := func(w http.ResponseWriter, r *http.Request) { trace.Render(w, r, true) }
  380. sctx.registerUserHandler("/debug/requests", http.HandlerFunc(reqf))
  381. evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) }
  382. sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf))
  383. }