Browse Source

etcd-agent: use "pkg/transport.Proxy"

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 8 years ago
parent
commit
a912ddcf20
1 changed files with 145 additions and 6 deletions
  1. 145 6
      tools/functional-tester/etcd-agent/agent.go

+ 145 - 6
tools/functional-tester/etcd-agent/agent.go

@@ -15,14 +15,19 @@
 package main
 
 import (
+	"fmt"
+	"net"
+	"net/url"
 	"os"
 	"os/exec"
 	"path/filepath"
+	"strconv"
+	"sync"
 	"syscall"
 	"time"
 
 	"github.com/coreos/etcd/pkg/fileutil"
-	"github.com/coreos/etcd/pkg/netutil"
+	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
 )
 
@@ -40,6 +45,9 @@ type Agent struct {
 	logfile *os.File
 
 	cfg AgentConfig
+
+	pmu                  sync.Mutex
+	advertisePortToProxy map[int]transport.Proxy
 }
 
 type AgentConfig struct {
@@ -68,7 +76,13 @@ func newAgent(cfg AgentConfig) (*Agent, error) {
 		return nil, err
 	}
 
-	return &Agent{state: stateUninitialized, cmd: c, logfile: f, cfg: cfg}, nil
+	return &Agent{
+		state:                stateUninitialized,
+		cmd:                  c,
+		logfile:              f,
+		cfg:                  cfg,
+		advertisePortToProxy: make(map[int]transport.Proxy),
+	}, nil
 }
 
 // start starts a new etcd process with the given args.
@@ -84,6 +98,85 @@ func (a *Agent) start(args ...string) error {
 	}
 
 	a.state = stateStarted
+
+	a.pmu.Lock()
+	defer a.pmu.Unlock()
+	if len(a.advertisePortToProxy) == 0 {
+		// enough time for etcd start before setting up proxy
+		time.Sleep(time.Second)
+		var (
+			err                    error
+			s                      string
+			listenClientURL        *url.URL
+			advertiseClientURL     *url.URL
+			advertiseClientURLPort int
+			listenPeerURL          *url.URL
+			advertisePeerURL       *url.URL
+			advertisePeerURLPort   int
+		)
+		for i := range args {
+			switch args[i] {
+			case "--listen-client-urls":
+				listenClientURL, err = url.Parse(args[i+1])
+				if err != nil {
+					return err
+				}
+			case "--advertise-client-urls":
+				advertiseClientURL, err = url.Parse(args[i+1])
+				if err != nil {
+					return err
+				}
+				_, s, err = net.SplitHostPort(advertiseClientURL.Host)
+				if err != nil {
+					return err
+				}
+				advertiseClientURLPort, err = strconv.Atoi(s)
+				if err != nil {
+					return err
+				}
+			case "--listen-peer-urls":
+				listenPeerURL, err = url.Parse(args[i+1])
+				if err != nil {
+					return err
+				}
+			case "--initial-advertise-peer-urls":
+				advertisePeerURL, err = url.Parse(args[i+1])
+				if err != nil {
+					return err
+				}
+				_, s, err = net.SplitHostPort(advertisePeerURL.Host)
+				if err != nil {
+					return err
+				}
+				advertisePeerURLPort, err = strconv.Atoi(s)
+				if err != nil {
+					return err
+				}
+			}
+		}
+
+		clientProxy := transport.NewProxy(transport.ProxyConfig{
+			From: *advertiseClientURL,
+			To:   *listenClientURL,
+		})
+		select {
+		case err = <-clientProxy.Error():
+			return err
+		case <-time.After(time.Second):
+		}
+		a.advertisePortToProxy[advertiseClientURLPort] = clientProxy
+
+		peerProxy := transport.NewProxy(transport.ProxyConfig{
+			From: *advertisePeerURL,
+			To:   *listenPeerURL,
+		})
+		select {
+		case err = <-peerProxy.Error():
+			return err
+		case <-time.After(time.Second):
+		}
+		a.advertisePortToProxy[advertisePeerURLPort] = peerProxy
+	}
 	return nil
 }
 
@@ -93,6 +186,24 @@ func (a *Agent) stopWithSig(sig os.Signal) error {
 		return nil
 	}
 
+	a.pmu.Lock()
+	if len(a.advertisePortToProxy) > 0 {
+		for _, p := range a.advertisePortToProxy {
+			if err := p.Close(); err != nil {
+				a.pmu.Unlock()
+				return err
+			}
+			select {
+			case <-p.Done():
+				// enough time to release port
+				time.Sleep(time.Second)
+			case <-time.After(time.Second):
+			}
+		}
+		a.advertisePortToProxy = make(map[int]transport.Proxy)
+	}
+	a.pmu.Unlock()
+
 	err := stopWithSig(a.cmd, sig)
 	if err != nil {
 		return err
@@ -177,18 +288,46 @@ func (a *Agent) terminate() error {
 }
 
 func (a *Agent) dropPort(port int) error {
-	return netutil.DropPort(port)
+	a.pmu.Lock()
+	defer a.pmu.Unlock()
+
+	p, ok := a.advertisePortToProxy[port]
+	if !ok {
+		return fmt.Errorf("%d does not have proxy", port)
+	}
+	p.BlackholeTx()
+	p.BlackholeRx()
+	return nil
 }
 
 func (a *Agent) recoverPort(port int) error {
-	return netutil.RecoverPort(port)
+	a.pmu.Lock()
+	defer a.pmu.Unlock()
+
+	p, ok := a.advertisePortToProxy[port]
+	if !ok {
+		return fmt.Errorf("%d does not have proxy", port)
+	}
+	p.UnblackholeTx()
+	p.UnblackholeRx()
+	return nil
 }
 
 func (a *Agent) setLatency(ms, rv int) error {
+	a.pmu.Lock()
+	defer a.pmu.Unlock()
+
 	if ms == 0 {
-		return netutil.RemoveLatency()
+		for _, p := range a.advertisePortToProxy {
+			p.UndelayTx()
+			p.UndelayRx()
+		}
 	}
-	return netutil.SetLatency(ms, rv)
+	for _, p := range a.advertisePortToProxy {
+		p.DelayTx(time.Duration(ms)*time.Millisecond, time.Duration(rv)*time.Millisecond)
+		p.DelayRx(time.Duration(ms)*time.Millisecond, time.Duration(rv)*time.Millisecond)
+	}
+	return nil
 }
 
 func (a *Agent) status() client.Status {