Bläddra i källkod

*: gRPC + HTTP on the same port

We use cmux to do this since we want to do http+https on the same
port in the near future too.
Xiang Li 9 år sedan
förälder
incheckning
d3809abe42

+ 3 - 3
V3DemoProcfile

@@ -1,7 +1,7 @@
 # Use goreman to run `go get github.com/mattn/goreman`
 # etcd1 is the default client server for etcdctlv3 commands
-etcd1: bin/etcd --experimental-v3demo=true --experimental-gRPC-addr 127.0.0.1:2378 --name infra1 --listen-client-urls http://127.0.0.1:12379 --advertise-client-urls http://127.0.0.1:12379 --listen-peer-urls http://127.0.0.1:12380 --initial-advertise-peer-urls http://127.0.0.1:12380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof
-etcd2: bin/etcd --experimental-v3demo=true --experimental-gRPC-addr 127.0.0.1:22378 --name infra2 --listen-client-urls http://127.0.0.1:22379 --advertise-client-urls http://127.0.0.1:22379 --listen-peer-urls http://127.0.0.1:22380 --initial-advertise-peer-urls http://127.0.0.1:22380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof
-etcd3: bin/etcd --experimental-v3demo=true --experimental-gRPC-addr 127.0.0.1:32378 --name infra3 --listen-client-urls http://127.0.0.1:32379 --advertise-client-urls http://127.0.0.1:32379 --listen-peer-urls http://127.0.0.1:32380 --initial-advertise-peer-urls http://127.0.0.1:32380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof
+etcd1: bin/etcd --experimental-v3demo=true --name infra1 --listen-client-urls http://127.0.0.1:2379 --advertise-client-urls http://127.0.0.1:2379 --listen-peer-urls http://127.0.0.1:12380 --initial-advertise-peer-urls http://127.0.0.1:12380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof
+etcd2: bin/etcd --experimental-v3demo=true --name infra2 --listen-client-urls http://127.0.0.1:22379 --advertise-client-urls http://127.0.0.1:22379 --listen-peer-urls http://127.0.0.1:22380 --initial-advertise-peer-urls http://127.0.0.1:22380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof
+etcd3: bin/etcd --experimental-v3demo=true --name infra3 --listen-client-urls http://127.0.0.1:32379 --advertise-client-urls http://127.0.0.1:32379 --listen-peer-urls http://127.0.0.1:32380 --initial-advertise-peer-urls http://127.0.0.1:32380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof
 # in future, use proxy to listen on 2379
 #proxy: bin/etcd --name infra-proxy1 --proxy=on --listen-client-urls http://127.0.0.1:2378 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --enable-pprof

+ 1 - 1
clientv3/README.md

@@ -16,7 +16,7 @@ Create client using `clientv3.New`:
 
 ```go
 cli, err := clientv3.New(clientv3.Config{
-	Endpoints:   []string{"localhost:12378", "localhost:22378", "localhost:32378"},
+	Endpoints:   []string{"localhost:2379", "localhost:22379", "localhost:32379"},
 	DialTimeout: 5 * time.Second,
 })
 if err != nil {

+ 1 - 1
clientv3/doc.go

@@ -17,7 +17,7 @@
 // Create client using `clientv3.New`:
 //
 //	cli, err := clientv3.New(clientv3.Config{
-//		Endpoints:   []string{"localhost:12378", "localhost:22378", "localhost:32378"},
+//		Endpoints:   []string{"localhost:2379", "localhost:22379", "localhost:32379"},
 //		DialTimeout: 5 * time.Second,
 //	})
 //	if err != nil {

+ 1 - 1
clientv3/example_cluster_test.go

@@ -113,7 +113,7 @@ func ExampleCluster_memberUpdate() {
 		log.Fatal(err)
 	}
 
-	peerURLs := []string{"http://localhost:12378"}
+	peerURLs := []string{"http://localhost:12380"}
 	_, err = cli.MemberUpdate(context.Background(), resp.Members[0].ID, peerURLs)
 	if err != nil {
 		log.Fatal(err)

+ 1 - 1
clientv3/example_test.go

@@ -25,7 +25,7 @@ import (
 var (
 	dialTimeout    = 5 * time.Second
 	requestTimeout = 1 * time.Second
-	endpoints      = []string{"localhost:2378", "localhost:22378", "http://localhost:32380"}
+	endpoints      = []string{"localhost:2379", "localhost:22379", "http://localhost:32379"}
 )
 
 func Example() {

+ 1 - 1
etcdctlv3/main.go

@@ -41,7 +41,7 @@ var (
 )
 
 func init() {
-	rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2378", "127.0.0.1:22378", "127.0.0.1:32378"}, "gRPC endpoints")
+	rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379", "127.0.0.1:22379", "127.0.0.1:32379"}, "gRPC endpoints")
 
 	rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (simple, json, protobuf)")
 	rootCmd.PersistentFlags().BoolVar(&globalFlags.IsHex, "hex", false, "print byte strings as hex encoded strings")

+ 0 - 2
etcdmain/config.go

@@ -123,7 +123,6 @@ type config struct {
 	printVersion bool
 
 	v3demo                  bool
-	gRPCAddr                string
 	autoCompactionRetention int
 
 	enablePprof bool
@@ -226,7 +225,6 @@ func NewConfig() *config {
 
 	// demo flag
 	fs.BoolVar(&cfg.v3demo, "experimental-v3demo", false, "Enable experimental v3 demo API.")
-	fs.StringVar(&cfg.gRPCAddr, "experimental-gRPC-addr", "127.0.0.1:2378", "gRPC address for experimental v3 demo API.")
 	fs.IntVar(&cfg.autoCompactionRetention, "experimental-auto-compaction-retention", 0, "Auto compaction retention in hour. 0 means disable auto compaction.")
 
 	// backwards-compatibility with v0.4.6

+ 18 - 25
etcdmain/etcd.go

@@ -35,6 +35,7 @@ import (
 	systemdutil "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-systemd/util"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
 	"github.com/coreos/etcd/discovery"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc"
@@ -281,15 +282,6 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
 		clns = append(clns, l)
 	}
 
-	var v3l net.Listener
-	if cfg.v3demo {
-		v3l, err = net.Listen("tcp", cfg.gRPCAddr)
-		if err != nil {
-			plog.Fatal(err)
-		}
-		plog.Infof("listening for client rpc on %s", cfg.gRPCAddr)
-	}
-
 	srvcfg := &etcdserver.ServerConfig{
 		Name:                    cfg.name,
 		ClientURLs:              cfg.acurls,
@@ -329,34 +321,35 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
 		Info:    cfg.corsInfo,
 	}
 	ph := etcdhttp.NewPeerHandler(s)
-	// Start the peer server in a goroutine
-	for _, l := range plns {
-		go func(l net.Listener) {
-			plog.Fatal(serveHTTP(l, ph, 5*time.Minute))
-		}(l)
-	}
-	// Start a client server goroutine for each listen address
-	for _, l := range clns {
-		go func(l net.Listener) {
-			// read timeout does not work with http close notify
-			// TODO: https://github.com/golang/go/issues/9524
-			plog.Fatal(serveHTTP(l, ch, 0))
-		}(l)
-	}
 
+	var grpcS *grpc.Server
 	if cfg.v3demo {
 		// set up v3 demo rpc
 		tls := &cfg.clientTLSInfo
 		if cfg.clientTLSInfo.Empty() {
 			tls = nil
 		}
-		grpcServer, err := v3rpc.Server(s, tls)
+		grpcS, err = v3rpc.Server(s, tls)
 		if err != nil {
 			s.Stop()
 			<-s.StopNotify()
 			return nil, err
 		}
-		go func() { plog.Fatal(grpcServer.Serve(v3l)) }()
+	}
+
+	// Start the peer server in a goroutine
+	for _, l := range plns {
+		go func(l net.Listener) {
+			plog.Fatal(serve(l, nil, ph, 5*time.Minute))
+		}(l)
+	}
+	// Start a client server goroutine for each listen address
+	for _, l := range clns {
+		go func(l net.Listener) {
+			// read timeout does not work with http close notify
+			// TODO: https://github.com/golang/go/issues/9524
+			plog.Fatal(serve(l, grpcS, ch, 0))
+		}(l)
 	}
 
 	return s.StopNotify(), nil

+ 0 - 2
etcdmain/help.go

@@ -139,8 +139,6 @@ experimental flags:
 		enable experimental v3 demo API.
 	--experimental-auto-compaction-retention '0'
 		auto compaction retention in hour. 0 means disable auto compaction.
-	--experimental-gRPC-addr '127.0.0.1:2378'
-		gRPC address for experimental v3 demo API.
 
 profiling flags:
 	--enable-pprof 'false'

+ 14 - 3
etcdmain/http.go → etcdmain/serve.go

@@ -20,14 +20,25 @@ import (
 	"net"
 	"net/http"
 	"time"
+
+	"github.com/cockroachdb/cmux"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
 )
 
-// serveHTTP accepts incoming HTTP connections on the listener l,
+// serve accepts incoming connections on the listener l,
 // creating a new service goroutine for each. The service goroutines
 // read requests and then call handler to reply to them.
-func serveHTTP(l net.Listener, handler http.Handler, readTimeout time.Duration) error {
+func serve(l net.Listener, grpcS *grpc.Server, handler http.Handler, readTimeout time.Duration) error {
 	// TODO: assert net.Listener type? Arbitrary listener might break HTTPS server which
 	// expect a TLS Conn type.
+	httpl := l
+	if grpcS != nil {
+		m := cmux.New(l)
+		grpcl := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
+		httpl = m.Match(cmux.Any())
+		go plog.Fatal(m.Serve())
+		go plog.Fatal(grpcS.Serve(grpcl))
+	}
 
 	logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
 	// TODO: add debug flag; enable logging when debug flag is set
@@ -36,5 +47,5 @@ func serveHTTP(l net.Listener, handler http.Handler, readTimeout time.Duration)
 		ReadTimeout: readTimeout,
 		ErrorLog:    logger, // do not log user error
 	}
-	return srv.Serve(l)
+	return srv.Serve(httpl)
 }

+ 1 - 1
tools/benchmark/cmd/root.go

@@ -49,7 +49,7 @@ var (
 )
 
 func init() {
-	RootCmd.PersistentFlags().StringSliceVar(&endpoints, "endpoints", []string{"127.0.0.1:2378"}, "gRPC endpoints")
+	RootCmd.PersistentFlags().StringSliceVar(&endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
 	RootCmd.PersistentFlags().UintVar(&totalConns, "conns", 1, "Total number of gRPC connections")
 	RootCmd.PersistentFlags().UintVar(&totalClients, "clients", 1, "Total number of gRPC clients")
 

+ 5 - 10
tools/functional-tester/etcd-tester/cluster.go

@@ -44,7 +44,6 @@ type cluster struct {
 	Agents     []client.Agent
 	Stressers  []Stresser
 	Names      []string
-	GRPCURLs   []string
 	ClientURLs []string
 }
 
@@ -89,7 +88,6 @@ func (c *cluster) Bootstrap() error {
 		if err != nil {
 			return err
 		}
-		grpcURLs[i] = fmt.Sprintf("%s:2378", host)
 		clientURLs[i] = fmt.Sprintf("http://%s:2379", host)
 		peerURLs[i] = fmt.Sprintf("http://%s:%d", host, peerURLPort)
 
@@ -115,9 +113,7 @@ func (c *cluster) Bootstrap() error {
 		}
 		if !c.v2Only {
 			flags = append(flags,
-				"--experimental-v3demo",
-				"--experimental-gRPC-addr", grpcURLs[i],
-			)
+				"--experimental-v3demo")
 		}
 
 		if _, err := a.Start(flags...); err != nil {
@@ -161,7 +157,6 @@ func (c *cluster) Bootstrap() error {
 	c.Agents = agents
 	c.Stressers = stressers
 	c.Names = names
-	c.GRPCURLs = grpcURLs
 	c.ClientURLs = clientURLs
 	return nil
 }
@@ -172,7 +167,7 @@ func (c *cluster) WaitHealth() error {
 	// TODO: set it to a reasonable value. It is set that high because
 	// follower may use long time to catch up the leader when reboot under
 	// reasonable workload (https://github.com/coreos/etcd/issues/2698)
-	healthFunc, urls := setHealthKey, c.GRPCURLs
+	healthFunc, urls := setHealthKey, c.ClientURLs
 	if c.v2Only {
 		healthFunc, urls = setHealthKeyV2, c.ClientURLs
 	}
@@ -192,7 +187,7 @@ func (c *cluster) GetLeader() (int, error) {
 		return 0, nil
 	}
 	cli, err := clientv3.New(clientv3.Config{
-		Endpoints:   c.GRPCURLs,
+		Endpoints:   c.ClientURLs,
 		DialTimeout: 5 * time.Second,
 	})
 	if err != nil {
@@ -304,7 +299,7 @@ func setHealthKeyV2(us []string) error {
 func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error) {
 	revs := make(map[string]int64)
 	hashes := make(map[string]int64)
-	for _, u := range c.GRPCURLs {
+	for _, u := range c.ClientURLs {
 		conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
 		if err != nil {
 			return nil, nil, err
@@ -328,7 +323,7 @@ func (c *cluster) compactKV(rev int64) error {
 		conn *grpc.ClientConn
 		err  error
 	)
-	for _, u := range c.GRPCURLs {
+	for _, u := range c.ClientURLs {
 		conn, err = grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
 		if err != nil {
 			continue