|
@@ -18,6 +18,7 @@
|
|
|
package etcdmain
|
|
package etcdmain
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
|
|
+ "crypto/tls"
|
|
|
"encoding/json"
|
|
"encoding/json"
|
|
|
"fmt"
|
|
"fmt"
|
|
|
"io/ioutil"
|
|
"io/ioutil"
|
|
@@ -33,7 +34,6 @@ import (
|
|
|
|
|
|
|
|
"github.com/coreos/etcd/discovery"
|
|
"github.com/coreos/etcd/discovery"
|
|
|
"github.com/coreos/etcd/etcdserver"
|
|
"github.com/coreos/etcd/etcdserver"
|
|
|
- "github.com/coreos/etcd/etcdserver/api/v3rpc"
|
|
|
|
|
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
|
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
|
|
"github.com/coreos/etcd/pkg/cors"
|
|
"github.com/coreos/etcd/pkg/cors"
|
|
|
"github.com/coreos/etcd/pkg/fileutil"
|
|
"github.com/coreos/etcd/pkg/fileutil"
|
|
@@ -49,7 +49,6 @@ import (
|
|
|
systemdutil "github.com/coreos/go-systemd/util"
|
|
systemdutil "github.com/coreos/go-systemd/util"
|
|
|
"github.com/coreos/pkg/capnslog"
|
|
"github.com/coreos/pkg/capnslog"
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
- "google.golang.org/grpc"
|
|
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
type dirType string
|
|
type dirType string
|
|
@@ -220,14 +219,24 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
|
|
if !cfg.peerTLSInfo.Empty() {
|
|
if !cfg.peerTLSInfo.Empty() {
|
|
|
plog.Infof("peerTLS: %s", cfg.peerTLSInfo)
|
|
plog.Infof("peerTLS: %s", cfg.peerTLSInfo)
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
plns := make([]net.Listener, 0)
|
|
plns := make([]net.Listener, 0)
|
|
|
for _, u := range cfg.lpurls {
|
|
for _, u := range cfg.lpurls {
|
|
|
if u.Scheme == "http" && !cfg.peerTLSInfo.Empty() {
|
|
if u.Scheme == "http" && !cfg.peerTLSInfo.Empty() {
|
|
|
plog.Warningf("The scheme of peer url %s is http while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
|
|
plog.Warningf("The scheme of peer url %s is http while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
|
|
|
}
|
|
}
|
|
|
- var l net.Listener
|
|
|
|
|
- l, err = rafthttp.NewListener(u, cfg.peerTLSInfo)
|
|
|
|
|
|
|
+ var (
|
|
|
|
|
+ l net.Listener
|
|
|
|
|
+ tlscfg *tls.Config
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ if !cfg.peerTLSInfo.Empty() {
|
|
|
|
|
+ tlscfg, err = cfg.peerTLSInfo.ServerConfig()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ l, err = rafthttp.NewListener(u, tlscfg)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
@@ -243,15 +252,40 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
|
|
plns = append(plns, l)
|
|
plns = append(plns, l)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ var ctlscfg *tls.Config
|
|
|
if !cfg.clientTLSInfo.Empty() {
|
|
if !cfg.clientTLSInfo.Empty() {
|
|
|
plog.Infof("clientTLS: %s", cfg.clientTLSInfo)
|
|
plog.Infof("clientTLS: %s", cfg.clientTLSInfo)
|
|
|
|
|
+ ctlscfg, err = cfg.clientTLSInfo.ServerConfig()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- clns := make([]net.Listener, 0)
|
|
|
|
|
|
|
+ sctxs := make(map[string]*serveCtx)
|
|
|
for _, u := range cfg.lcurls {
|
|
for _, u := range cfg.lcurls {
|
|
|
if u.Scheme == "http" && !cfg.clientTLSInfo.Empty() {
|
|
if u.Scheme == "http" && !cfg.clientTLSInfo.Empty() {
|
|
|
plog.Warningf("The scheme of client url %s is http while client key/cert files are presented. Ignored client key/cert files.", u.String())
|
|
plog.Warningf("The scheme of client url %s is http while client key/cert files are presented. Ignored client key/cert files.", u.String())
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ ctx := &serveCtx{host: u.Host}
|
|
|
|
|
+
|
|
|
|
|
+ if u.Scheme == "https" {
|
|
|
|
|
+ ctx.secure = true
|
|
|
|
|
+ } else {
|
|
|
|
|
+ ctx.insecure = true
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if sctxs[u.Host] != nil {
|
|
|
|
|
+ if ctx.secure {
|
|
|
|
|
+ sctxs[u.Host].secure = true
|
|
|
|
|
+ }
|
|
|
|
|
+ if ctx.insecure {
|
|
|
|
|
+ sctxs[u.Host].insecure = true
|
|
|
|
|
+ }
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
var l net.Listener
|
|
var l net.Listener
|
|
|
|
|
+
|
|
|
l, err = net.Listen("tcp", u.Host)
|
|
l, err = net.Listen("tcp", u.Host)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return nil, err
|
|
return nil, err
|
|
@@ -265,22 +299,20 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
|
|
l = transport.LimitListener(l, int(fdLimit-reservedInternalFDNum))
|
|
l = transport.LimitListener(l, int(fdLimit-reservedInternalFDNum))
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Do not wrap around this listener if TLS Info is set.
|
|
|
|
|
- // HTTPS server expects TLS Conn created by TLSListener.
|
|
|
|
|
- l, err = transport.NewKeepAliveListener(l, u.Scheme, cfg.clientTLSInfo)
|
|
|
|
|
|
|
+ l, err = transport.NewKeepAliveListener(l, "tcp", nil)
|
|
|
|
|
+ ctx.l = l
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- urlStr := u.String()
|
|
|
|
|
- plog.Info("listening for client requests on ", urlStr)
|
|
|
|
|
|
|
+ plog.Info("listening for client requests on ", u.Host)
|
|
|
defer func() {
|
|
defer func() {
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
l.Close()
|
|
l.Close()
|
|
|
- plog.Info("stopping listening for client requests on ", urlStr)
|
|
|
|
|
|
|
+ plog.Info("stopping listening for client requests on ", u.Host)
|
|
|
}
|
|
}
|
|
|
}()
|
|
}()
|
|
|
- clns = append(clns, l)
|
|
|
|
|
|
|
+ sctxs[u.Host] = ctx
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
srvcfg := &etcdserver.ServerConfig{
|
|
srvcfg := &etcdserver.ServerConfig{
|
|
@@ -317,40 +349,25 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
|
|
if cfg.corsInfo.String() != "" {
|
|
if cfg.corsInfo.String() != "" {
|
|
|
plog.Infof("cors = %s", cfg.corsInfo)
|
|
plog.Infof("cors = %s", cfg.corsInfo)
|
|
|
}
|
|
}
|
|
|
- ch := &cors.CORSHandler{
|
|
|
|
|
|
|
+ ch := http.Handler(&cors.CORSHandler{
|
|
|
Handler: etcdhttp.NewClientHandler(s, srvcfg.ReqTimeout()),
|
|
Handler: etcdhttp.NewClientHandler(s, srvcfg.ReqTimeout()),
|
|
|
Info: cfg.corsInfo,
|
|
Info: cfg.corsInfo,
|
|
|
- }
|
|
|
|
|
|
|
+ })
|
|
|
ph := etcdhttp.NewPeerHandler(s)
|
|
ph := etcdhttp.NewPeerHandler(s)
|
|
|
|
|
|
|
|
- var grpcS *grpc.Server
|
|
|
|
|
- if cfg.v3demo {
|
|
|
|
|
- // set up v3 demo rpc
|
|
|
|
|
- tls := &cfg.clientTLSInfo
|
|
|
|
|
- if cfg.clientTLSInfo.Empty() {
|
|
|
|
|
- tls = nil
|
|
|
|
|
- }
|
|
|
|
|
- grpcS, err = v3rpc.Server(s, tls)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- s.Stop()
|
|
|
|
|
- <-s.StopNotify()
|
|
|
|
|
- return nil, err
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
// Start the peer server in a goroutine
|
|
// Start the peer server in a goroutine
|
|
|
for _, l := range plns {
|
|
for _, l := range plns {
|
|
|
go func(l net.Listener) {
|
|
go func(l net.Listener) {
|
|
|
- plog.Fatal(serve(l, nil, ph, 5*time.Minute))
|
|
|
|
|
|
|
+ plog.Fatal(servePeerHTTP(l, ph))
|
|
|
}(l)
|
|
}(l)
|
|
|
}
|
|
}
|
|
|
// Start a client server goroutine for each listen address
|
|
// Start a client server goroutine for each listen address
|
|
|
- for _, l := range clns {
|
|
|
|
|
- go func(l net.Listener) {
|
|
|
|
|
|
|
+ for _, sctx := range sctxs {
|
|
|
|
|
+ go func(sctx *serveCtx) {
|
|
|
// read timeout does not work with http close notify
|
|
// read timeout does not work with http close notify
|
|
|
// TODO: https://github.com/golang/go/issues/9524
|
|
// TODO: https://github.com/golang/go/issues/9524
|
|
|
- plog.Fatal(serve(l, grpcS, ch, 0))
|
|
|
|
|
- }(l)
|
|
|
|
|
|
|
+ plog.Fatal(serve(sctx, s, ctlscfg, ch))
|
|
|
|
|
+ }(sctx)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
return s.StopNotify(), nil
|
|
return s.StopNotify(), nil
|
|
@@ -419,11 +436,11 @@ func startProxy(cfg *config) error {
|
|
|
|
|
|
|
|
clientURLs := []string{}
|
|
clientURLs := []string{}
|
|
|
uf := func() []string {
|
|
uf := func() []string {
|
|
|
- gcls, err := etcdserver.GetClusterFromRemotePeers(peerURLs, tr)
|
|
|
|
|
|
|
+ gcls, gerr := etcdserver.GetClusterFromRemotePeers(peerURLs, tr)
|
|
|
// TODO: remove the 2nd check when we fix GetClusterFromRemotePeers
|
|
// TODO: remove the 2nd check when we fix GetClusterFromRemotePeers
|
|
|
// GetClusterFromRemotePeers should not return nil error with an invalid empty cluster
|
|
// GetClusterFromRemotePeers should not return nil error with an invalid empty cluster
|
|
|
- if err != nil {
|
|
|
|
|
- plog.Warningf("proxy: %v", err)
|
|
|
|
|
|
|
+ if gerr != nil {
|
|
|
|
|
+ plog.Warningf("proxy: %v", gerr)
|
|
|
return []string{}
|
|
return []string{}
|
|
|
}
|
|
}
|
|
|
if len(gcls.Members()) == 0 {
|
|
if len(gcls.Members()) == 0 {
|
|
@@ -432,9 +449,9 @@ func startProxy(cfg *config) error {
|
|
|
clientURLs = gcls.ClientURLs()
|
|
clientURLs = gcls.ClientURLs()
|
|
|
|
|
|
|
|
urls := struct{ PeerURLs []string }{gcls.PeerURLs()}
|
|
urls := struct{ PeerURLs []string }{gcls.PeerURLs()}
|
|
|
- b, err := json.Marshal(urls)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- plog.Warningf("proxy: error on marshal peer urls %s", err)
|
|
|
|
|
|
|
+ b, jerr := json.Marshal(urls)
|
|
|
|
|
+ if jerr != nil {
|
|
|
|
|
+ plog.Warningf("proxy: error on marshal peer urls %s", jerr)
|
|
|
return clientURLs
|
|
return clientURLs
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -466,7 +483,18 @@ func startProxy(cfg *config) error {
|
|
|
}
|
|
}
|
|
|
// Start a proxy server goroutine for each listen address
|
|
// Start a proxy server goroutine for each listen address
|
|
|
for _, u := range cfg.lcurls {
|
|
for _, u := range cfg.lcurls {
|
|
|
- l, err := transport.NewListener(u.Host, u.Scheme, cfg.clientTLSInfo)
|
|
|
|
|
|
|
+ var (
|
|
|
|
|
+ l net.Listener
|
|
|
|
|
+ tlscfg *tls.Config
|
|
|
|
|
+ )
|
|
|
|
|
+ if !cfg.clientTLSInfo.Empty() {
|
|
|
|
|
+ tlscfg, err = cfg.clientTLSInfo.ServerConfig()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ l, err := transport.NewListener(u.Host, u.Scheme, tlscfg)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|