|
@@ -188,7 +188,7 @@ func (c *cluster) fillClusterForMembers() error {
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func newCluster(t *testing.T, cfg *ClusterConfig) *cluster {
|
|
|
|
|
|
|
+func newCluster(t testing.TB, cfg *ClusterConfig) *cluster {
|
|
|
c := &cluster{cfg: cfg}
|
|
c := &cluster{cfg: cfg}
|
|
|
ms := make([]*member, cfg.Size)
|
|
ms := make([]*member, cfg.Size)
|
|
|
for i := 0; i < cfg.Size; i++ {
|
|
for i := 0; i < cfg.Size; i++ {
|
|
@@ -204,16 +204,16 @@ func newCluster(t *testing.T, cfg *ClusterConfig) *cluster {
|
|
|
|
|
|
|
|
// NewCluster returns an unlaunched cluster of the given size which has been
|
|
// NewCluster returns an unlaunched cluster of the given size which has been
|
|
|
// set to use static bootstrap.
|
|
// set to use static bootstrap.
|
|
|
-func NewCluster(t *testing.T, size int) *cluster {
|
|
|
|
|
|
|
+func NewCluster(t testing.TB, size int) *cluster {
|
|
|
return newCluster(t, &ClusterConfig{Size: size})
|
|
return newCluster(t, &ClusterConfig{Size: size})
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration
|
|
// NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration
|
|
|
-func NewClusterByConfig(t *testing.T, cfg *ClusterConfig) *cluster {
|
|
|
|
|
|
|
+func NewClusterByConfig(t testing.TB, cfg *ClusterConfig) *cluster {
|
|
|
return newCluster(t, cfg)
|
|
return newCluster(t, cfg)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (c *cluster) Launch(t *testing.T) {
|
|
|
|
|
|
|
+func (c *cluster) Launch(t testing.TB) {
|
|
|
errc := make(chan error)
|
|
errc := make(chan error)
|
|
|
for _, m := range c.Members {
|
|
for _, m := range c.Members {
|
|
|
// Members are launched in separate goroutines because if they boot
|
|
// Members are launched in separate goroutines because if they boot
|
|
@@ -274,7 +274,7 @@ func (c *cluster) HTTPMembers() []client.Member {
|
|
|
return ms
|
|
return ms
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (c *cluster) mustNewMember(t *testing.T) *member {
|
|
|
|
|
|
|
+func (c *cluster) mustNewMember(t testing.TB) *member {
|
|
|
m := mustNewMember(t,
|
|
m := mustNewMember(t,
|
|
|
memberConfig{
|
|
memberConfig{
|
|
|
name: c.name(rand.Int()),
|
|
name: c.name(rand.Int()),
|
|
@@ -303,7 +303,7 @@ func (c *cluster) mustNewMember(t *testing.T) *member {
|
|
|
return m
|
|
return m
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (c *cluster) addMember(t *testing.T) {
|
|
|
|
|
|
|
+func (c *cluster) addMember(t testing.TB) {
|
|
|
m := c.mustNewMember(t)
|
|
m := c.mustNewMember(t)
|
|
|
|
|
|
|
|
scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
|
|
scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
|
|
@@ -335,7 +335,7 @@ func (c *cluster) addMember(t *testing.T) {
|
|
|
c.waitMembersMatch(t, c.HTTPMembers())
|
|
c.waitMembersMatch(t, c.HTTPMembers())
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error {
|
|
|
|
|
|
|
+func (c *cluster) addMemberByURL(t testing.TB, clientURL, peerURL string) error {
|
|
|
cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
|
|
cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
|
|
|
ma := client.NewMembersAPI(cc)
|
|
ma := client.NewMembersAPI(cc)
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
|
@@ -351,17 +351,17 @@ func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (c *cluster) AddMember(t *testing.T) {
|
|
|
|
|
|
|
+func (c *cluster) AddMember(t testing.TB) {
|
|
|
c.addMember(t)
|
|
c.addMember(t)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (c *cluster) RemoveMember(t *testing.T, id uint64) {
|
|
|
|
|
|
|
+func (c *cluster) RemoveMember(t testing.TB, id uint64) {
|
|
|
if err := c.removeMember(t, id); err != nil {
|
|
if err := c.removeMember(t, id); err != nil {
|
|
|
t.Fatal(err)
|
|
t.Fatal(err)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (c *cluster) removeMember(t *testing.T, id uint64) error {
|
|
|
|
|
|
|
+func (c *cluster) removeMember(t testing.TB, id uint64) error {
|
|
|
// send remove request to the cluster
|
|
// send remove request to the cluster
|
|
|
cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
|
|
cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
|
|
|
ma := client.NewMembersAPI(cc)
|
|
ma := client.NewMembersAPI(cc)
|
|
@@ -392,7 +392,7 @@ func (c *cluster) removeMember(t *testing.T, id uint64) error {
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (c *cluster) Terminate(t *testing.T) {
|
|
|
|
|
|
|
+func (c *cluster) Terminate(t testing.TB) {
|
|
|
var wg sync.WaitGroup
|
|
var wg sync.WaitGroup
|
|
|
wg.Add(len(c.Members))
|
|
wg.Add(len(c.Members))
|
|
|
for _, m := range c.Members {
|
|
for _, m := range c.Members {
|
|
@@ -404,7 +404,7 @@ func (c *cluster) Terminate(t *testing.T) {
|
|
|
wg.Wait()
|
|
wg.Wait()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
|
|
|
|
|
|
|
+func (c *cluster) waitMembersMatch(t testing.TB, membs []client.Member) {
|
|
|
for _, u := range c.URLs() {
|
|
for _, u := range c.URLs() {
|
|
|
cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
|
|
cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
|
|
|
ma := client.NewMembersAPI(cc)
|
|
ma := client.NewMembersAPI(cc)
|
|
@@ -420,10 +420,10 @@ func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (c *cluster) WaitLeader(t *testing.T) int { return c.waitLeader(t, c.Members) }
|
|
|
|
|
|
|
+func (c *cluster) WaitLeader(t testing.TB) int { return c.waitLeader(t, c.Members) }
|
|
|
|
|
|
|
|
// waitLeader waits until given members agree on the same leader.
|
|
// waitLeader waits until given members agree on the same leader.
|
|
|
-func (c *cluster) waitLeader(t *testing.T, membs []*member) int {
|
|
|
|
|
|
|
+func (c *cluster) waitLeader(t testing.TB, membs []*member) int {
|
|
|
possibleLead := make(map[uint64]bool)
|
|
possibleLead := make(map[uint64]bool)
|
|
|
var lead uint64
|
|
var lead uint64
|
|
|
for _, m := range membs {
|
|
for _, m := range membs {
|
|
@@ -516,14 +516,14 @@ func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
|
|
|
return reflect.DeepEqual(membs, wmembs)
|
|
return reflect.DeepEqual(membs, wmembs)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func newLocalListener(t *testing.T) net.Listener {
|
|
|
|
|
|
|
+func newLocalListener(t testing.TB) net.Listener {
|
|
|
c := atomic.AddInt64(&localListenCount, 1)
|
|
c := atomic.AddInt64(&localListenCount, 1)
|
|
|
// Go 1.8+ allows only numbers in port
|
|
// Go 1.8+ allows only numbers in port
|
|
|
addr := fmt.Sprintf("127.0.0.1:%05d%05d", c+basePort, os.Getpid())
|
|
addr := fmt.Sprintf("127.0.0.1:%05d%05d", c+basePort, os.Getpid())
|
|
|
return NewListenerWithAddr(t, addr)
|
|
return NewListenerWithAddr(t, addr)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func NewListenerWithAddr(t *testing.T, addr string) net.Listener {
|
|
|
|
|
|
|
+func NewListenerWithAddr(t testing.TB, addr string) net.Listener {
|
|
|
l, err := transport.NewUnixListener(addr)
|
|
l, err := transport.NewUnixListener(addr)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
t.Fatal(err)
|
|
@@ -583,7 +583,7 @@ type memberConfig struct {
|
|
|
|
|
|
|
|
// mustNewMember return an inited member with the given name. If peerTLS is
|
|
// mustNewMember return an inited member with the given name. If peerTLS is
|
|
|
// set, it will use https scheme to communicate between peers.
|
|
// set, it will use https scheme to communicate between peers.
|
|
|
-func mustNewMember(t *testing.T, mcfg memberConfig) *member {
|
|
|
|
|
|
|
+func mustNewMember(t testing.TB, mcfg memberConfig) *member {
|
|
|
var err error
|
|
var err error
|
|
|
m := &member{}
|
|
m := &member{}
|
|
|
|
|
|
|
@@ -759,7 +759,7 @@ func NewClientV3(m *member) (*clientv3.Client, error) {
|
|
|
|
|
|
|
|
// Clone returns a member with the same server configuration. The returned
|
|
// Clone returns a member with the same server configuration. The returned
|
|
|
// member will not set PeerListeners and ClientListeners.
|
|
// member will not set PeerListeners and ClientListeners.
|
|
|
-func (m *member) Clone(t *testing.T) *member {
|
|
|
|
|
|
|
+func (m *member) Clone(t testing.TB) *member {
|
|
|
mm := &member{}
|
|
mm := &member{}
|
|
|
mm.ServerConfig = m.ServerConfig
|
|
mm.ServerConfig = m.ServerConfig
|
|
|
|
|
|
|
@@ -959,14 +959,14 @@ func (m *member) Launch() error {
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (m *member) WaitOK(t *testing.T) {
|
|
|
|
|
|
|
+func (m *member) WaitOK(t testing.TB) {
|
|
|
m.WaitStarted(t)
|
|
m.WaitStarted(t)
|
|
|
for m.s.Leader() == 0 {
|
|
for m.s.Leader() == 0 {
|
|
|
time.Sleep(tickDuration)
|
|
time.Sleep(tickDuration)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (m *member) WaitStarted(t *testing.T) {
|
|
|
|
|
|
|
+func (m *member) WaitStarted(t testing.TB) {
|
|
|
cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
|
|
cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
|
|
|
kapi := client.NewKeysAPI(cc)
|
|
kapi := client.NewKeysAPI(cc)
|
|
|
for {
|
|
for {
|
|
@@ -981,7 +981,7 @@ func (m *member) WaitStarted(t *testing.T) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func WaitClientV3(t *testing.T, kv clientv3.KV) {
|
|
|
|
|
|
|
+func WaitClientV3(t testing.TB, kv clientv3.KV) {
|
|
|
timeout := time.Now().Add(requestTimeout)
|
|
timeout := time.Now().Add(requestTimeout)
|
|
|
var err error
|
|
var err error
|
|
|
for time.Now().Before(timeout) {
|
|
for time.Now().Before(timeout) {
|
|
@@ -1035,7 +1035,7 @@ func (m *member) Close() {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Stop stops the member, but the data dir of the member is preserved.
|
|
// Stop stops the member, but the data dir of the member is preserved.
|
|
|
-func (m *member) Stop(t *testing.T) {
|
|
|
|
|
|
|
+func (m *member) Stop(t testing.TB) {
|
|
|
lg.Info(
|
|
lg.Info(
|
|
|
"stopping a member",
|
|
"stopping a member",
|
|
|
zap.String("name", m.Name),
|
|
zap.String("name", m.Name),
|
|
@@ -1069,7 +1069,7 @@ func (m *member) StopNotify() <-chan struct{} {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Restart starts the member using the preserved data dir.
|
|
// Restart starts the member using the preserved data dir.
|
|
|
-func (m *member) Restart(t *testing.T) error {
|
|
|
|
|
|
|
+func (m *member) Restart(t testing.TB) error {
|
|
|
lg.Info(
|
|
lg.Info(
|
|
|
"restarting a member",
|
|
"restarting a member",
|
|
|
zap.String("name", m.Name),
|
|
zap.String("name", m.Name),
|
|
@@ -1107,7 +1107,7 @@ func (m *member) Restart(t *testing.T) error {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Terminate stops the member and removes the data dir.
|
|
// Terminate stops the member and removes the data dir.
|
|
|
-func (m *member) Terminate(t *testing.T) {
|
|
|
|
|
|
|
+func (m *member) Terminate(t testing.TB) {
|
|
|
lg.Info(
|
|
lg.Info(
|
|
|
"terminating a member",
|
|
"terminating a member",
|
|
|
zap.String("name", m.Name),
|
|
zap.String("name", m.Name),
|
|
@@ -1157,7 +1157,7 @@ func (m *member) Metric(metricName string) (string, error) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// InjectPartition drops connections from m to others, vice versa.
|
|
// InjectPartition drops connections from m to others, vice versa.
|
|
|
-func (m *member) InjectPartition(t *testing.T, others ...*member) {
|
|
|
|
|
|
|
+func (m *member) InjectPartition(t testing.TB, others ...*member) {
|
|
|
for _, other := range others {
|
|
for _, other := range others {
|
|
|
m.s.CutPeer(other.s.ID())
|
|
m.s.CutPeer(other.s.ID())
|
|
|
other.s.CutPeer(m.s.ID())
|
|
other.s.CutPeer(m.s.ID())
|
|
@@ -1165,14 +1165,14 @@ func (m *member) InjectPartition(t *testing.T, others ...*member) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// RecoverPartition recovers connections from m to others, vice versa.
|
|
// RecoverPartition recovers connections from m to others, vice versa.
|
|
|
-func (m *member) RecoverPartition(t *testing.T, others ...*member) {
|
|
|
|
|
|
|
+func (m *member) RecoverPartition(t testing.TB, others ...*member) {
|
|
|
for _, other := range others {
|
|
for _, other := range others {
|
|
|
m.s.MendPeer(other.s.ID())
|
|
m.s.MendPeer(other.s.ID())
|
|
|
other.s.MendPeer(m.s.ID())
|
|
other.s.MendPeer(m.s.ID())
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client {
|
|
|
|
|
|
|
+func MustNewHTTPClient(t testing.TB, eps []string, tls *transport.TLSInfo) client.Client {
|
|
|
cfgtls := transport.TLSInfo{}
|
|
cfgtls := transport.TLSInfo{}
|
|
|
if tls != nil {
|
|
if tls != nil {
|
|
|
cfgtls = *tls
|
|
cfgtls = *tls
|
|
@@ -1185,7 +1185,7 @@ func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) clien
|
|
|
return c
|
|
return c
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func mustNewTransport(t *testing.T, tlsInfo transport.TLSInfo) *http.Transport {
|
|
|
|
|
|
|
+func mustNewTransport(t testing.TB, tlsInfo transport.TLSInfo) *http.Transport {
|
|
|
// tick in integration test is short, so 1s dial timeout could play well.
|
|
// tick in integration test is short, so 1s dial timeout could play well.
|
|
|
tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
|
tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -1211,7 +1211,7 @@ type ClusterV3 struct {
|
|
|
|
|
|
|
|
// NewClusterV3 returns a launched cluster with a grpc client connection
|
|
// NewClusterV3 returns a launched cluster with a grpc client connection
|
|
|
// for each cluster member.
|
|
// for each cluster member.
|
|
|
-func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
|
|
|
|
|
|
|
+func NewClusterV3(t testing.TB, cfg *ClusterConfig) *ClusterV3 {
|
|
|
cfg.UseGRPC = true
|
|
cfg.UseGRPC = true
|
|
|
if os.Getenv("CLIENT_DEBUG") != "" {
|
|
if os.Getenv("CLIENT_DEBUG") != "" {
|
|
|
clientv3.SetLogger(grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 4))
|
|
clientv3.SetLogger(grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 4))
|
|
@@ -1240,7 +1240,7 @@ func (c *ClusterV3) TakeClient(idx int) {
|
|
|
c.mu.Unlock()
|
|
c.mu.Unlock()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (c *ClusterV3) Terminate(t *testing.T) {
|
|
|
|
|
|
|
+func (c *ClusterV3) Terminate(t testing.TB) {
|
|
|
c.mu.Lock()
|
|
c.mu.Lock()
|
|
|
for _, client := range c.clients {
|
|
for _, client := range c.clients {
|
|
|
if client == nil {
|
|
if client == nil {
|