Browse Source

Merge pull request #5337 from gyuho/configurable_monitor_interval

etcdmain: gateway monitor-interval flag
Gyu-Ho Lee 9 years ago
parent
commit
f4d1501198
2 changed files with 25 additions and 11 deletions
  1. 8 7
      etcdmain/gateway.go
  2. 17 4
      proxy/tcpproxy/userspace.go

+ 8 - 7
etcdmain/gateway.go

@@ -18,7 +18,7 @@ import (
 	"fmt"
 	"net"
 	"os"
-	"strings"
+	"time"
 
 	"github.com/coreos/etcd/proxy/tcpproxy"
 	"github.com/spf13/cobra"
@@ -26,7 +26,8 @@ import (
 
 var (
 	gatewayListenAddr string
-	gatewayEndpoints  string
+	gatewayEndpoints  []string
+	getewayRetryDelay time.Duration
 )
 
 var (
@@ -60,14 +61,13 @@ func newGatewayStartCommand() *cobra.Command {
 	}
 
 	cmd.Flags().StringVar(&gatewayListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
-	cmd.Flags().StringVar(&gatewayEndpoints, "endpoints", "127.0.0.1:2379", "comma separated etcd cluster endpoints")
+	cmd.Flags().StringSliceVar(&gatewayEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
+	cmd.Flags().DurationVar(&getewayRetryDelay, "retry-delay", time.Minute, "duration of delay before retrying failed endpoints")
 
 	return &cmd
 }
 
 func startGateway(cmd *cobra.Command, args []string) {
-	endpoints := strings.Split(gatewayEndpoints, ",")
-
 	l, err := net.Listen("tcp", gatewayListenAddr)
 	if err != nil {
 		fmt.Fprintln(os.Stderr, err)
@@ -75,8 +75,9 @@ func startGateway(cmd *cobra.Command, args []string) {
 	}
 
 	tp := tcpproxy.TCPProxy{
-		Listener:  l,
-		Endpoints: endpoints,
+		Listener:        l,
+		Endpoints:       gatewayEndpoints,
+		MonitorInterval: getewayRetryDelay,
 	}
 
 	tp.Run()

+ 17 - 4
proxy/tcpproxy/userspace.go

@@ -19,6 +19,12 @@ import (
 	"net"
 	"sync"
 	"time"
+
+	"github.com/coreos/pkg/capnslog"
+)
+
+var (
+	plog = capnslog.NewPackageLogger("github.com/coreos/etcd/proxy", "tcpproxy")
 )
 
 type remote struct {
@@ -33,16 +39,16 @@ func (r *remote) inactivate() {
 	r.inactive = true
 }
 
-func (r *remote) tryReactivate() {
+func (r *remote) tryReactivate() error {
 	conn, err := net.Dial("tcp", r.addr)
 	if err != nil {
-		return
+		return err
 	}
 	conn.Close()
 	r.mu.Lock()
 	defer r.mu.Unlock()
 	r.inactive = false
-	return
+	return nil
 }
 
 func (r *remote) isActive() bool {
@@ -106,6 +112,7 @@ func (tp *TCPProxy) serve(in net.Conn) {
 			break
 		}
 		remote.inactivate()
+		plog.Warningf("deactivated endpoint [%s] due to %v for %v", remote.addr, err, tp.MonitorInterval)
 	}
 
 	if out == nil {
@@ -141,7 +148,13 @@ func (tp *TCPProxy) runMonitor() {
 			tp.mu.Lock()
 			for _, r := range tp.remotes {
 				if !r.isActive() {
-					go r.tryReactivate()
+					go func() {
+						if err := r.tryReactivate(); err != nil {
+							plog.Warningf("failed to activate endpoint [%s] due to %v (stay inactive for another %v)", r.addr, err, tp.MonitorInterval)
+						} else {
+							plog.Printf("activated %s", r.addr)
+						}
+					}()
 				}
 			}
 			tp.mu.Unlock()