Quellcode durchsuchen

Merge pull request #5092 from xiang90/etcdlet

*: gateway initial commit
Xiang Li vor 9 Jahren
Ursprung
Commit
aaefd52afa
5 geänderte Dateien mit 155 neuen und 33 gelöschten Zeilen
  1. 1 3
      etcdmain/etcd.go
  2. 83 0
      etcdmain/gateway.go
  3. 37 0
      etcdmain/main.go
  4. 29 19
      proxy/tcpproxy/userspace.go
  5. 5 11
      proxy/tcpproxy/userspace_test.go

+ 1 - 3
etcdmain/etcd.go

@@ -75,9 +75,7 @@ var (
 	dirEmpty  = dirType("empty")
 )
 
-func Main() {
-	checkSupportArch()
-
+func startEtcdOrProxyV2() {
 	cfg := NewConfig()
 	err := cfg.Parse(os.Args[1:])
 	if err != nil {

+ 83 - 0
etcdmain/gateway.go

@@ -0,0 +1,83 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package etcdmain
+
+import (
+	"fmt"
+	"net"
+	"os"
+	"strings"
+
+	"github.com/coreos/etcd/proxy/tcpproxy"
+	"github.com/spf13/cobra"
+)
+
+var (
+	gatewayListenAddr string
+	gatewayEndpoints  string
+)
+
+var (
+	rootCmd = &cobra.Command{
+		Use:        "etcd",
+		Short:      "etcd server",
+		SuggestFor: []string{"etcd"},
+	}
+)
+
+func init() {
+	rootCmd.AddCommand(newGatewayCommand())
+}
+
+// newGatewayCommand returns the cobra command for "gateway".
+func newGatewayCommand() *cobra.Command {
+	lpc := &cobra.Command{
+		Use:   "gateway <subcommand>",
+		Short: "gateway related command",
+	}
+	lpc.AddCommand(newGatewayStartCommand())
+
+	return lpc
+}
+
+func newGatewayStartCommand() *cobra.Command {
+	cmd := cobra.Command{
+		Use:   "start",
+		Short: "start the gateway",
+		Run:   startGateway,
+	}
+
+	cmd.Flags().StringVar(&gatewayListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
+	cmd.Flags().StringVar(&gatewayEndpoints, "endpoints", "127.0.0.1:2379", "comma separated etcd cluster endpoints")
+
+	return &cmd
+}
+
+func startGateway(cmd *cobra.Command, args []string) {
+	endpoints := strings.Split(gatewayEndpoints, ",")
+
+	l, err := net.Listen("tcp", gatewayListenAddr)
+	if err != nil {
+		fmt.Fprintln(os.Stderr, err)
+		os.Exit(1)
+	}
+
+	tp := tcpproxy.TCPProxy{
+		Listener:  l,
+		Endpoints: endpoints,
+	}
+
+	tp.Run()
+}

+ 37 - 0
etcdmain/main.go

@@ -0,0 +1,37 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package etcdmain
+
+import (
+	"fmt"
+	"os"
+)
+
+func Main() {
+	checkSupportArch()
+
+	if len(os.Args) > 1 {
+		switch os.Args[1] {
+		case "gateway":
+			if err := rootCmd.Execute(); err != nil {
+				fmt.Fprint(os.Stderr, err)
+				os.Exit(1)
+			}
+			return
+		}
+	}
+
+	startEtcdOrProxyV2()
+}

+ 29 - 19
proxy/tcpproxy/userspace.go

@@ -21,16 +21,6 @@ import (
 	"time"
 )
 
-type tcpProxy struct {
-	l               net.Listener
-	monitorInterval time.Duration
-	donec           chan struct{}
-
-	mu         sync.Mutex // guards the following fields
-	remotes    []*remote
-	nextRemote int
-}
-
 type remote struct {
 	mu       sync.Mutex
 	addr     string
@@ -61,10 +51,30 @@ func (r *remote) isActive() bool {
 	return !r.inactive
 }
 
-func (tp *tcpProxy) run() error {
+type TCPProxy struct {
+	Listener        net.Listener
+	Endpoints       []string
+	MonitorInterval time.Duration
+
+	donec chan struct{}
+
+	mu         sync.Mutex // guards the following fields
+	remotes    []*remote
+	nextRemote int
+}
+
+func (tp *TCPProxy) Run() error {
+	tp.donec = make(chan struct{})
+	if tp.MonitorInterval == 0 {
+		tp.MonitorInterval = 5 * time.Minute
+	}
+	for _, ep := range tp.Endpoints {
+		tp.remotes = append(tp.remotes, &remote{addr: ep})
+	}
+
 	go tp.runMonitor()
 	for {
-		in, err := tp.l.Accept()
+		in, err := tp.Listener.Accept()
 		if err != nil {
 			return err
 		}
@@ -73,13 +83,13 @@ func (tp *tcpProxy) run() error {
 	}
 }
 
-func (tp *tcpProxy) numRemotes() int {
+func (tp *TCPProxy) numRemotes() int {
 	tp.mu.Lock()
 	defer tp.mu.Unlock()
 	return len(tp.remotes)
 }
 
-func (tp *tcpProxy) serve(in net.Conn) {
+func (tp *TCPProxy) serve(in net.Conn) {
 	var (
 		err error
 		out net.Conn
@@ -115,7 +125,7 @@ func (tp *tcpProxy) serve(in net.Conn) {
 }
 
 // pick picks a remote in round-robin fashion
-func (tp *tcpProxy) pick() *remote {
+func (tp *TCPProxy) pick() *remote {
 	tp.mu.Lock()
 	defer tp.mu.Unlock()
 
@@ -124,10 +134,10 @@ func (tp *tcpProxy) pick() *remote {
 	return picked
 }
 
-func (tp *tcpProxy) runMonitor() {
+func (tp *TCPProxy) runMonitor() {
 	for {
 		select {
-		case <-time.After(tp.monitorInterval):
+		case <-time.After(tp.MonitorInterval):
 			tp.mu.Lock()
 			for _, r := range tp.remotes {
 				if !r.isActive() {
@@ -141,9 +151,9 @@ func (tp *tcpProxy) runMonitor() {
 	}
 }
 
-func (tp *tcpProxy) stop() {
+func (tp *TCPProxy) Stop() {
 	// graceful shutdown?
 	// shutdown current connections?
-	tp.l.Close()
+	tp.Listener.Close()
 	close(tp.donec)
 }

+ 5 - 11
proxy/tcpproxy/userspace_test.go

@@ -22,7 +22,6 @@ import (
 	"net/http/httptest"
 	"net/url"
 	"testing"
-	"time"
 )
 
 func TestUserspaceProxy(t *testing.T) {
@@ -43,17 +42,12 @@ func TestUserspaceProxy(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	p := tcpProxy{
-		l:               l,
-		donec:           make(chan struct{}),
-		monitorInterval: time.Second,
-
-		remotes: []*remote{
-			{addr: u.Host},
-		},
+	p := TCPProxy{
+		Listener:  l,
+		Endpoints: []string{u.Host},
 	}
-	go p.run()
-	defer p.stop()
+	go p.Run()
+	defer p.Stop()
 
 	u.Host = l.Addr().String()