cluster.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "fmt"
  19. "io/ioutil"
  20. "log"
  21. "math/rand"
  22. "net"
  23. "net/http"
  24. "net/http/httptest"
  25. "os"
  26. "reflect"
  27. "sort"
  28. "strings"
  29. "sync"
  30. "sync/atomic"
  31. "testing"
  32. "time"
  33. "github.com/coreos/etcd/client"
  34. "github.com/coreos/etcd/clientv3"
  35. "github.com/coreos/etcd/embed"
  36. "github.com/coreos/etcd/etcdserver"
  37. "github.com/coreos/etcd/etcdserver/api/etcdhttp"
  38. "github.com/coreos/etcd/etcdserver/api/rafthttp"
  39. "github.com/coreos/etcd/etcdserver/api/v2http"
  40. "github.com/coreos/etcd/etcdserver/api/v3client"
  41. "github.com/coreos/etcd/etcdserver/api/v3election"
  42. epb "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
  43. "github.com/coreos/etcd/etcdserver/api/v3lock"
  44. lockpb "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
  45. "github.com/coreos/etcd/etcdserver/api/v3rpc"
  46. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  47. "github.com/coreos/etcd/pkg/testutil"
  48. "github.com/coreos/etcd/pkg/tlsutil"
  49. "github.com/coreos/etcd/pkg/transport"
  50. "github.com/coreos/etcd/pkg/types"
  51. "github.com/soheilhy/cmux"
  52. "go.uber.org/zap"
  53. "golang.org/x/crypto/bcrypt"
  54. "google.golang.org/grpc"
  55. "google.golang.org/grpc/grpclog"
  56. "google.golang.org/grpc/keepalive"
  57. )
  58. const (
  59. // RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss.
  60. RequestWaitTimeout = 3 * time.Second
  61. tickDuration = 10 * time.Millisecond
  62. requestTimeout = 20 * time.Second
  63. clusterName = "etcd"
  64. basePort = 21000
  65. UrlScheme = "unix"
  66. UrlSchemeTLS = "unixs"
  67. )
  68. var (
  69. electionTicks = 10
  70. // integration test uses unique ports, counting up, to listen for each
  71. // member, ensuring restarted members can listen on the same port again.
  72. localListenCount int64 = 0
  73. testTLSInfo = transport.TLSInfo{
  74. KeyFile: "./fixtures/server.key.insecure",
  75. CertFile: "./fixtures/server.crt",
  76. TrustedCAFile: "./fixtures/ca.crt",
  77. ClientCertAuth: true,
  78. }
  79. testTLSInfoIP = transport.TLSInfo{
  80. KeyFile: "./fixtures/server-ip.key.insecure",
  81. CertFile: "./fixtures/server-ip.crt",
  82. TrustedCAFile: "./fixtures/ca.crt",
  83. ClientCertAuth: true,
  84. }
  85. testTLSInfoExpired = transport.TLSInfo{
  86. KeyFile: "./fixtures-expired/server.key.insecure",
  87. CertFile: "./fixtures-expired/server.crt",
  88. TrustedCAFile: "./fixtures-expired/ca.crt",
  89. ClientCertAuth: true,
  90. }
  91. testTLSInfoExpiredIP = transport.TLSInfo{
  92. KeyFile: "./fixtures-expired/server-ip.key.insecure",
  93. CertFile: "./fixtures-expired/server-ip.crt",
  94. TrustedCAFile: "./fixtures-expired/ca.crt",
  95. ClientCertAuth: true,
  96. }
  97. defaultTokenJWT = "jwt,pub-key=./fixtures/server.crt,priv-key=./fixtures/server.key.insecure,sign-method=RS256,ttl=1s"
  98. lg = zap.NewNop()
  99. )
  100. func init() {
  101. if os.Getenv("CLUSTER_DEBUG") != "" {
  102. lg, _ = zap.NewProduction()
  103. }
  104. }
  105. type ClusterConfig struct {
  106. Size int
  107. PeerTLS *transport.TLSInfo
  108. ClientTLS *transport.TLSInfo
  109. DiscoveryURL string
  110. AuthToken string
  111. UseGRPC bool
  112. QuotaBackendBytes int64
  113. MaxTxnOps uint
  114. MaxRequestBytes uint
  115. SnapshotCount uint64
  116. SnapshotCatchUpEntries uint64
  117. GRPCKeepAliveMinTime time.Duration
  118. GRPCKeepAliveInterval time.Duration
  119. GRPCKeepAliveTimeout time.Duration
  120. // SkipCreatingClient to skip creating clients for each member.
  121. SkipCreatingClient bool
  122. ClientMaxCallSendMsgSize int
  123. ClientMaxCallRecvMsgSize int
  124. // UseIP is true to use only IP for gRPC requests.
  125. UseIP bool
  126. }
  127. type cluster struct {
  128. cfg *ClusterConfig
  129. Members []*member
  130. }
  131. func schemeFromTLSInfo(tls *transport.TLSInfo) string {
  132. if tls == nil {
  133. return UrlScheme
  134. }
  135. return UrlSchemeTLS
  136. }
  137. func (c *cluster) fillClusterForMembers() error {
  138. if c.cfg.DiscoveryURL != "" {
  139. // cluster will be discovered
  140. return nil
  141. }
  142. addrs := make([]string, 0)
  143. for _, m := range c.Members {
  144. scheme := schemeFromTLSInfo(m.PeerTLSInfo)
  145. for _, l := range m.PeerListeners {
  146. addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String()))
  147. }
  148. }
  149. clusterStr := strings.Join(addrs, ",")
  150. var err error
  151. for _, m := range c.Members {
  152. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  153. if err != nil {
  154. return err
  155. }
  156. }
  157. return nil
  158. }
  159. func newCluster(t *testing.T, cfg *ClusterConfig) *cluster {
  160. c := &cluster{cfg: cfg}
  161. ms := make([]*member, cfg.Size)
  162. for i := 0; i < cfg.Size; i++ {
  163. ms[i] = c.mustNewMember(t)
  164. }
  165. c.Members = ms
  166. if err := c.fillClusterForMembers(); err != nil {
  167. t.Fatal(err)
  168. }
  169. return c
  170. }
  171. // NewCluster returns an unlaunched cluster of the given size which has been
  172. // set to use static bootstrap.
  173. func NewCluster(t *testing.T, size int) *cluster {
  174. return newCluster(t, &ClusterConfig{Size: size})
  175. }
  176. // NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration
  177. func NewClusterByConfig(t *testing.T, cfg *ClusterConfig) *cluster {
  178. return newCluster(t, cfg)
  179. }
  180. func (c *cluster) Launch(t *testing.T) {
  181. errc := make(chan error)
  182. for _, m := range c.Members {
  183. // Members are launched in separate goroutines because if they boot
  184. // using discovery url, they have to wait for others to register to continue.
  185. go func(m *member) {
  186. errc <- m.Launch()
  187. }(m)
  188. }
  189. for range c.Members {
  190. if err := <-errc; err != nil {
  191. t.Fatalf("error setting up member: %v", err)
  192. }
  193. }
  194. // wait cluster to be stable to receive future client requests
  195. c.waitMembersMatch(t, c.HTTPMembers())
  196. c.waitVersion()
  197. }
  198. func (c *cluster) URL(i int) string {
  199. return c.Members[i].ClientURLs[0].String()
  200. }
  201. // URLs returns a list of all active client URLs in the cluster
  202. func (c *cluster) URLs() []string {
  203. return getMembersURLs(c.Members)
  204. }
  205. func getMembersURLs(members []*member) []string {
  206. urls := make([]string, 0)
  207. for _, m := range members {
  208. select {
  209. case <-m.s.StopNotify():
  210. continue
  211. default:
  212. }
  213. for _, u := range m.ClientURLs {
  214. urls = append(urls, u.String())
  215. }
  216. }
  217. return urls
  218. }
  219. // HTTPMembers returns a list of all active members as client.Members
  220. func (c *cluster) HTTPMembers() []client.Member {
  221. ms := []client.Member{}
  222. for _, m := range c.Members {
  223. pScheme := schemeFromTLSInfo(m.PeerTLSInfo)
  224. cScheme := schemeFromTLSInfo(m.ClientTLSInfo)
  225. cm := client.Member{Name: m.Name}
  226. for _, ln := range m.PeerListeners {
  227. cm.PeerURLs = append(cm.PeerURLs, pScheme+"://"+ln.Addr().String())
  228. }
  229. for _, ln := range m.ClientListeners {
  230. cm.ClientURLs = append(cm.ClientURLs, cScheme+"://"+ln.Addr().String())
  231. }
  232. ms = append(ms, cm)
  233. }
  234. return ms
  235. }
  236. func (c *cluster) mustNewMember(t *testing.T) *member {
  237. m := mustNewMember(t,
  238. memberConfig{
  239. name: c.name(rand.Int()),
  240. authToken: c.cfg.AuthToken,
  241. peerTLS: c.cfg.PeerTLS,
  242. clientTLS: c.cfg.ClientTLS,
  243. quotaBackendBytes: c.cfg.QuotaBackendBytes,
  244. maxTxnOps: c.cfg.MaxTxnOps,
  245. maxRequestBytes: c.cfg.MaxRequestBytes,
  246. snapshotCount: c.cfg.SnapshotCount,
  247. snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries,
  248. grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime,
  249. grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
  250. grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout,
  251. clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
  252. clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
  253. useIP: c.cfg.UseIP,
  254. })
  255. m.DiscoveryURL = c.cfg.DiscoveryURL
  256. if c.cfg.UseGRPC {
  257. if err := m.listenGRPC(); err != nil {
  258. t.Fatal(err)
  259. }
  260. }
  261. return m
  262. }
  263. func (c *cluster) addMember(t *testing.T) {
  264. m := c.mustNewMember(t)
  265. scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
  266. // send add request to the cluster
  267. var err error
  268. for i := 0; i < len(c.Members); i++ {
  269. clientURL := c.URL(i)
  270. peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
  271. if err = c.addMemberByURL(t, clientURL, peerURL); err == nil {
  272. break
  273. }
  274. }
  275. if err != nil {
  276. t.Fatalf("add member failed on all members error: %v", err)
  277. }
  278. m.InitialPeerURLsMap = types.URLsMap{}
  279. for _, mm := range c.Members {
  280. m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
  281. }
  282. m.InitialPeerURLsMap[m.Name] = m.PeerURLs
  283. m.NewCluster = false
  284. if err := m.Launch(); err != nil {
  285. t.Fatal(err)
  286. }
  287. c.Members = append(c.Members, m)
  288. // wait cluster to be stable to receive future client requests
  289. c.waitMembersMatch(t, c.HTTPMembers())
  290. }
  291. func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error {
  292. cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
  293. ma := client.NewMembersAPI(cc)
  294. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  295. _, err := ma.Add(ctx, peerURL)
  296. cancel()
  297. if err != nil {
  298. return err
  299. }
  300. // wait for the add node entry applied in the cluster
  301. members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
  302. c.waitMembersMatch(t, members)
  303. return nil
  304. }
  305. func (c *cluster) AddMember(t *testing.T) {
  306. c.addMember(t)
  307. }
  308. func (c *cluster) RemoveMember(t *testing.T, id uint64) {
  309. if err := c.removeMember(t, id); err != nil {
  310. t.Fatal(err)
  311. }
  312. }
  313. func (c *cluster) removeMember(t *testing.T, id uint64) error {
  314. // send remove request to the cluster
  315. cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
  316. ma := client.NewMembersAPI(cc)
  317. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  318. err := ma.Remove(ctx, types.ID(id).String())
  319. cancel()
  320. if err != nil {
  321. return err
  322. }
  323. newMembers := make([]*member, 0)
  324. for _, m := range c.Members {
  325. if uint64(m.s.ID()) != id {
  326. newMembers = append(newMembers, m)
  327. } else {
  328. select {
  329. case <-m.s.StopNotify():
  330. m.Terminate(t)
  331. // 1s stop delay + election timeout + 1s disk and network delay + connection write timeout
  332. // TODO: remove connection write timeout by selecting on http response closeNotifier
  333. // blocking on https://github.com/golang/go/issues/9524
  334. case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout):
  335. t.Fatalf("failed to remove member %s in time", m.s.ID())
  336. }
  337. }
  338. }
  339. c.Members = newMembers
  340. c.waitMembersMatch(t, c.HTTPMembers())
  341. return nil
  342. }
  343. func (c *cluster) Terminate(t *testing.T) {
  344. var wg sync.WaitGroup
  345. wg.Add(len(c.Members))
  346. for _, m := range c.Members {
  347. go func(mm *member) {
  348. defer wg.Done()
  349. mm.Terminate(t)
  350. }(m)
  351. }
  352. wg.Wait()
  353. }
  354. func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
  355. for _, u := range c.URLs() {
  356. cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
  357. ma := client.NewMembersAPI(cc)
  358. for {
  359. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  360. ms, err := ma.List(ctx)
  361. cancel()
  362. if err == nil && isMembersEqual(ms, membs) {
  363. break
  364. }
  365. time.Sleep(tickDuration)
  366. }
  367. }
  368. }
  369. func (c *cluster) WaitLeader(t *testing.T) int { return c.waitLeader(t, c.Members) }
  370. // waitLeader waits until given members agree on the same leader.
  371. func (c *cluster) waitLeader(t *testing.T, membs []*member) int {
  372. possibleLead := make(map[uint64]bool)
  373. var lead uint64
  374. for _, m := range membs {
  375. possibleLead[uint64(m.s.ID())] = true
  376. }
  377. cc := MustNewHTTPClient(t, getMembersURLs(membs), nil)
  378. kapi := client.NewKeysAPI(cc)
  379. // ensure leader is up via linearizable get
  380. for {
  381. ctx, cancel := context.WithTimeout(context.Background(), 10*tickDuration+time.Second)
  382. _, err := kapi.Get(ctx, "0", &client.GetOptions{Quorum: true})
  383. cancel()
  384. if err == nil || strings.Contains(err.Error(), "Key not found") {
  385. break
  386. }
  387. }
  388. for lead == 0 || !possibleLead[lead] {
  389. lead = 0
  390. for _, m := range membs {
  391. select {
  392. case <-m.s.StopNotify():
  393. continue
  394. default:
  395. }
  396. if lead != 0 && lead != m.s.Lead() {
  397. lead = 0
  398. time.Sleep(10 * tickDuration)
  399. break
  400. }
  401. lead = m.s.Lead()
  402. }
  403. }
  404. for i, m := range membs {
  405. if uint64(m.s.ID()) == lead {
  406. return i
  407. }
  408. }
  409. return -1
  410. }
  411. func (c *cluster) WaitNoLeader() { c.waitNoLeader(c.Members) }
  412. // waitNoLeader waits until given members lose leader.
  413. func (c *cluster) waitNoLeader(membs []*member) {
  414. noLeader := false
  415. for !noLeader {
  416. noLeader = true
  417. for _, m := range membs {
  418. select {
  419. case <-m.s.StopNotify():
  420. continue
  421. default:
  422. }
  423. if m.s.Lead() != 0 {
  424. noLeader = false
  425. time.Sleep(10 * tickDuration)
  426. break
  427. }
  428. }
  429. }
  430. }
  431. func (c *cluster) waitVersion() {
  432. for _, m := range c.Members {
  433. for {
  434. if m.s.ClusterVersion() != nil {
  435. break
  436. }
  437. time.Sleep(tickDuration)
  438. }
  439. }
  440. }
  441. func (c *cluster) name(i int) string {
  442. return fmt.Sprint(i)
  443. }
  444. // isMembersEqual checks whether two members equal except ID field.
  445. // The given wmembs should always set ID field to empty string.
  446. func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
  447. sort.Sort(SortableMemberSliceByPeerURLs(membs))
  448. sort.Sort(SortableMemberSliceByPeerURLs(wmembs))
  449. for i := range membs {
  450. membs[i].ID = ""
  451. }
  452. return reflect.DeepEqual(membs, wmembs)
  453. }
  454. func newLocalListener(t *testing.T) net.Listener {
  455. c := atomic.AddInt64(&localListenCount, 1)
  456. // Go 1.8+ allows only numbers in port
  457. addr := fmt.Sprintf("127.0.0.1:%05d%05d", c+basePort, os.Getpid())
  458. return NewListenerWithAddr(t, addr)
  459. }
  460. func NewListenerWithAddr(t *testing.T, addr string) net.Listener {
  461. l, err := transport.NewUnixListener(addr)
  462. if err != nil {
  463. t.Fatal(err)
  464. }
  465. return l
  466. }
  467. type member struct {
  468. etcdserver.ServerConfig
  469. PeerListeners, ClientListeners []net.Listener
  470. grpcListener net.Listener
  471. // PeerTLSInfo enables peer TLS when set
  472. PeerTLSInfo *transport.TLSInfo
  473. // ClientTLSInfo enables client TLS when set
  474. ClientTLSInfo *transport.TLSInfo
  475. raftHandler *testutil.PauseableHandler
  476. s *etcdserver.EtcdServer
  477. serverClosers []func()
  478. grpcServerOpts []grpc.ServerOption
  479. grpcServer *grpc.Server
  480. grpcServerPeer *grpc.Server
  481. grpcAddr string
  482. grpcBridge *bridge
  483. // serverClient is a clientv3 that directly calls the etcdserver.
  484. serverClient *clientv3.Client
  485. keepDataDirTerminate bool
  486. clientMaxCallSendMsgSize int
  487. clientMaxCallRecvMsgSize int
  488. useIP bool
  489. }
  490. func (m *member) GRPCAddr() string { return m.grpcAddr }
  491. type memberConfig struct {
  492. name string
  493. peerTLS *transport.TLSInfo
  494. clientTLS *transport.TLSInfo
  495. authToken string
  496. quotaBackendBytes int64
  497. maxTxnOps uint
  498. maxRequestBytes uint
  499. snapshotCount uint64
  500. snapshotCatchUpEntries uint64
  501. grpcKeepAliveMinTime time.Duration
  502. grpcKeepAliveInterval time.Duration
  503. grpcKeepAliveTimeout time.Duration
  504. clientMaxCallSendMsgSize int
  505. clientMaxCallRecvMsgSize int
  506. useIP bool
  507. }
  508. // mustNewMember return an inited member with the given name. If peerTLS is
  509. // set, it will use https scheme to communicate between peers.
  510. func mustNewMember(t *testing.T, mcfg memberConfig) *member {
  511. var err error
  512. m := &member{}
  513. peerScheme := schemeFromTLSInfo(mcfg.peerTLS)
  514. clientScheme := schemeFromTLSInfo(mcfg.clientTLS)
  515. pln := newLocalListener(t)
  516. m.PeerListeners = []net.Listener{pln}
  517. m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()})
  518. if err != nil {
  519. t.Fatal(err)
  520. }
  521. m.PeerTLSInfo = mcfg.peerTLS
  522. cln := newLocalListener(t)
  523. m.ClientListeners = []net.Listener{cln}
  524. m.ClientURLs, err = types.NewURLs([]string{clientScheme + "://" + cln.Addr().String()})
  525. if err != nil {
  526. t.Fatal(err)
  527. }
  528. m.ClientTLSInfo = mcfg.clientTLS
  529. m.Name = mcfg.name
  530. m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
  531. if err != nil {
  532. t.Fatal(err)
  533. }
  534. clusterStr := fmt.Sprintf("%s=%s://%s", mcfg.name, peerScheme, pln.Addr().String())
  535. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  536. if err != nil {
  537. t.Fatal(err)
  538. }
  539. m.InitialClusterToken = clusterName
  540. m.NewCluster = true
  541. m.BootstrapTimeout = 10 * time.Millisecond
  542. if m.PeerTLSInfo != nil {
  543. m.ServerConfig.PeerTLSInfo = *m.PeerTLSInfo
  544. }
  545. m.ElectionTicks = electionTicks
  546. m.InitialElectionTickAdvance = true
  547. m.TickMs = uint(tickDuration / time.Millisecond)
  548. m.QuotaBackendBytes = mcfg.quotaBackendBytes
  549. m.MaxTxnOps = mcfg.maxTxnOps
  550. if m.MaxTxnOps == 0 {
  551. m.MaxTxnOps = embed.DefaultMaxTxnOps
  552. }
  553. m.MaxRequestBytes = mcfg.maxRequestBytes
  554. if m.MaxRequestBytes == 0 {
  555. m.MaxRequestBytes = embed.DefaultMaxRequestBytes
  556. }
  557. m.SnapshotCount = etcdserver.DefaultSnapshotCount
  558. if mcfg.snapshotCount != 0 {
  559. m.SnapshotCount = mcfg.snapshotCount
  560. }
  561. m.SnapshotCatchUpEntries = etcdserver.DefaultSnapshotCatchUpEntries
  562. if mcfg.snapshotCatchUpEntries != 0 {
  563. m.SnapshotCatchUpEntries = mcfg.snapshotCatchUpEntries
  564. }
  565. // for the purpose of integration testing, simple token is enough
  566. m.AuthToken = "simple"
  567. if mcfg.authToken != "" {
  568. m.AuthToken = mcfg.authToken
  569. }
  570. m.BcryptCost = uint(bcrypt.MinCost) // use min bcrypt cost to speedy up integration testing
  571. m.grpcServerOpts = []grpc.ServerOption{}
  572. if mcfg.grpcKeepAliveMinTime > time.Duration(0) {
  573. m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
  574. MinTime: mcfg.grpcKeepAliveMinTime,
  575. PermitWithoutStream: false,
  576. }))
  577. }
  578. if mcfg.grpcKeepAliveInterval > time.Duration(0) &&
  579. mcfg.grpcKeepAliveTimeout > time.Duration(0) {
  580. m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveParams(keepalive.ServerParameters{
  581. Time: mcfg.grpcKeepAliveInterval,
  582. Timeout: mcfg.grpcKeepAliveTimeout,
  583. }))
  584. }
  585. m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize
  586. m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize
  587. m.useIP = mcfg.useIP
  588. m.InitialCorruptCheck = true
  589. m.LoggerConfig = &zap.Config{
  590. Level: zap.NewAtomicLevelAt(zap.InfoLevel),
  591. Development: false,
  592. Sampling: &zap.SamplingConfig{
  593. Initial: 100,
  594. Thereafter: 100,
  595. },
  596. Encoding: "json",
  597. EncoderConfig: zap.NewProductionEncoderConfig(),
  598. OutputPaths: []string{"/dev/null"},
  599. ErrorOutputPaths: []string{"/dev/null"},
  600. }
  601. if os.Getenv("CLUSTER_DEBUG") != "" {
  602. m.LoggerConfig.OutputPaths = []string{"stderr"}
  603. m.LoggerConfig.ErrorOutputPaths = []string{"stderr"}
  604. }
  605. m.Logger, err = m.LoggerConfig.Build()
  606. if err != nil {
  607. t.Fatal(err)
  608. }
  609. return m
  610. }
  611. // listenGRPC starts a grpc server over a unix domain socket on the member
  612. func (m *member) listenGRPC() error {
  613. // prefix with localhost so cert has right domain
  614. m.grpcAddr = "localhost:" + m.Name
  615. if m.useIP { // for IP-only TLS certs
  616. m.grpcAddr = "127.0.0.1:" + m.Name
  617. }
  618. l, err := transport.NewUnixListener(m.grpcAddr)
  619. if err != nil {
  620. return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
  621. }
  622. m.grpcBridge, err = newBridge(m.grpcAddr)
  623. if err != nil {
  624. l.Close()
  625. return err
  626. }
  627. m.grpcAddr = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + m.grpcBridge.inaddr
  628. m.grpcListener = l
  629. return nil
  630. }
  631. func (m *member) ElectionTimeout() time.Duration {
  632. return time.Duration(m.s.Cfg.ElectionTicks*int(m.s.Cfg.TickMs)) * time.Millisecond
  633. }
  634. func (m *member) ID() types.ID { return m.s.ID() }
  635. func (m *member) DropConnections() { m.grpcBridge.Reset() }
  636. func (m *member) PauseConnections() { m.grpcBridge.Pause() }
  637. func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
  638. func (m *member) Blackhole() { m.grpcBridge.Blackhole() }
  639. func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() }
  640. // NewClientV3 creates a new grpc client connection to the member
  641. func NewClientV3(m *member) (*clientv3.Client, error) {
  642. if m.grpcAddr == "" {
  643. return nil, fmt.Errorf("member not configured for grpc")
  644. }
  645. cfg := clientv3.Config{
  646. Endpoints: []string{m.grpcAddr},
  647. DialTimeout: 5 * time.Second,
  648. MaxCallSendMsgSize: m.clientMaxCallSendMsgSize,
  649. MaxCallRecvMsgSize: m.clientMaxCallRecvMsgSize,
  650. }
  651. if m.ClientTLSInfo != nil {
  652. tls, err := m.ClientTLSInfo.ClientConfig()
  653. if err != nil {
  654. return nil, err
  655. }
  656. cfg.TLS = tls
  657. }
  658. return newClientV3(cfg)
  659. }
  660. // Clone returns a member with the same server configuration. The returned
  661. // member will not set PeerListeners and ClientListeners.
  662. func (m *member) Clone(t *testing.T) *member {
  663. mm := &member{}
  664. mm.ServerConfig = m.ServerConfig
  665. var err error
  666. clientURLStrs := m.ClientURLs.StringSlice()
  667. mm.ClientURLs, err = types.NewURLs(clientURLStrs)
  668. if err != nil {
  669. // this should never fail
  670. panic(err)
  671. }
  672. peerURLStrs := m.PeerURLs.StringSlice()
  673. mm.PeerURLs, err = types.NewURLs(peerURLStrs)
  674. if err != nil {
  675. // this should never fail
  676. panic(err)
  677. }
  678. clusterStr := m.InitialPeerURLsMap.String()
  679. mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  680. if err != nil {
  681. // this should never fail
  682. panic(err)
  683. }
  684. mm.InitialClusterToken = m.InitialClusterToken
  685. mm.ElectionTicks = m.ElectionTicks
  686. mm.PeerTLSInfo = m.PeerTLSInfo
  687. mm.ClientTLSInfo = m.ClientTLSInfo
  688. return mm
  689. }
  690. // Launch starts a member based on ServerConfig, PeerListeners
  691. // and ClientListeners.
  692. func (m *member) Launch() error {
  693. lg.Info(
  694. "launching a member",
  695. zap.String("name", m.Name),
  696. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  697. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  698. zap.String("grpc-address", m.grpcAddr),
  699. )
  700. var err error
  701. if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
  702. return fmt.Errorf("failed to initialize the etcd server: %v", err)
  703. }
  704. m.s.SyncTicker = time.NewTicker(500 * time.Millisecond)
  705. m.s.Start()
  706. var peerTLScfg *tls.Config
  707. if m.PeerTLSInfo != nil && !m.PeerTLSInfo.Empty() {
  708. if peerTLScfg, err = m.PeerTLSInfo.ServerConfig(); err != nil {
  709. return err
  710. }
  711. }
  712. if m.grpcListener != nil {
  713. var (
  714. tlscfg *tls.Config
  715. )
  716. if m.ClientTLSInfo != nil && !m.ClientTLSInfo.Empty() {
  717. tlscfg, err = m.ClientTLSInfo.ServerConfig()
  718. if err != nil {
  719. return err
  720. }
  721. }
  722. m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...)
  723. m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg)
  724. m.serverClient = v3client.New(m.s)
  725. lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient))
  726. epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient))
  727. go m.grpcServer.Serve(m.grpcListener)
  728. }
  729. m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.Logger, m.s)}
  730. h := (http.Handler)(m.raftHandler)
  731. if m.grpcListener != nil {
  732. h = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  733. if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
  734. m.grpcServerPeer.ServeHTTP(w, r)
  735. } else {
  736. m.raftHandler.ServeHTTP(w, r)
  737. }
  738. })
  739. }
  740. for _, ln := range m.PeerListeners {
  741. cm := cmux.New(ln)
  742. // don't hang on matcher after closing listener
  743. cm.SetReadTimeout(time.Second)
  744. if m.grpcServer != nil {
  745. grpcl := cm.Match(cmux.HTTP2())
  746. go m.grpcServerPeer.Serve(grpcl)
  747. }
  748. // serve http1/http2 rafthttp/grpc
  749. ll := cm.Match(cmux.Any())
  750. if peerTLScfg != nil {
  751. if ll, err = transport.NewTLSListener(ll, m.PeerTLSInfo); err != nil {
  752. return err
  753. }
  754. }
  755. hs := &httptest.Server{
  756. Listener: ll,
  757. Config: &http.Server{
  758. Handler: h,
  759. TLSConfig: peerTLScfg,
  760. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  761. },
  762. TLS: peerTLScfg,
  763. }
  764. hs.Start()
  765. donec := make(chan struct{})
  766. go func() {
  767. defer close(donec)
  768. cm.Serve()
  769. }()
  770. closer := func() {
  771. ll.Close()
  772. hs.CloseClientConnections()
  773. hs.Close()
  774. <-donec
  775. }
  776. m.serverClosers = append(m.serverClosers, closer)
  777. }
  778. for _, ln := range m.ClientListeners {
  779. hs := &httptest.Server{
  780. Listener: ln,
  781. Config: &http.Server{
  782. Handler: v2http.NewClientHandler(
  783. m.Logger,
  784. m.s,
  785. m.ServerConfig.ReqTimeout(),
  786. ),
  787. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  788. },
  789. }
  790. if m.ClientTLSInfo == nil {
  791. hs.Start()
  792. } else {
  793. info := m.ClientTLSInfo
  794. hs.TLS, err = info.ServerConfig()
  795. if err != nil {
  796. return err
  797. }
  798. // baseConfig is called on initial TLS handshake start.
  799. //
  800. // Previously,
  801. // 1. Server has non-empty (*tls.Config).Certificates on client hello
  802. // 2. Server calls (*tls.Config).GetCertificate iff:
  803. // - Server's (*tls.Config).Certificates is not empty, or
  804. // - Client supplies SNI; non-empty (*tls.ClientHelloInfo).ServerName
  805. //
  806. // When (*tls.Config).Certificates is always populated on initial handshake,
  807. // client is expected to provide a valid matching SNI to pass the TLS
  808. // verification, thus trigger server (*tls.Config).GetCertificate to reload
  809. // TLS assets. However, a cert whose SAN field does not include domain names
  810. // but only IP addresses, has empty (*tls.ClientHelloInfo).ServerName, thus
  811. // it was never able to trigger TLS reload on initial handshake; first
  812. // ceritifcate object was being used, never being updated.
  813. //
  814. // Now, (*tls.Config).Certificates is created empty on initial TLS client
  815. // handshake, in order to trigger (*tls.Config).GetCertificate and populate
  816. // rest of the certificates on every new TLS connection, even when client
  817. // SNI is empty (e.g. cert only includes IPs).
  818. //
  819. // This introduces another problem with "httptest.Server":
  820. // when server initial certificates are empty, certificates
  821. // are overwritten by Go's internal test certs, which have
  822. // different SAN fields (e.g. example.com). To work around,
  823. // re-overwrite (*tls.Config).Certificates before starting
  824. // test server.
  825. tlsCert, err := tlsutil.NewCert(info.CertFile, info.KeyFile, nil)
  826. if err != nil {
  827. return err
  828. }
  829. hs.TLS.Certificates = []tls.Certificate{*tlsCert}
  830. hs.StartTLS()
  831. }
  832. closer := func() {
  833. ln.Close()
  834. hs.CloseClientConnections()
  835. hs.Close()
  836. }
  837. m.serverClosers = append(m.serverClosers, closer)
  838. }
  839. lg.Info(
  840. "launched a member",
  841. zap.String("name", m.Name),
  842. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  843. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  844. zap.String("grpc-address", m.grpcAddr),
  845. )
  846. return nil
  847. }
  848. func (m *member) WaitOK(t *testing.T) {
  849. cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
  850. kapi := client.NewKeysAPI(cc)
  851. for {
  852. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  853. _, err := kapi.Get(ctx, "/", nil)
  854. if err != nil {
  855. time.Sleep(tickDuration)
  856. continue
  857. }
  858. cancel()
  859. break
  860. }
  861. for m.s.Leader() == 0 {
  862. time.Sleep(tickDuration)
  863. }
  864. }
  865. func (m *member) URL() string { return m.ClientURLs[0].String() }
  866. func (m *member) Pause() {
  867. m.raftHandler.Pause()
  868. m.s.PauseSending()
  869. }
  870. func (m *member) Resume() {
  871. m.raftHandler.Resume()
  872. m.s.ResumeSending()
  873. }
  874. // Close stops the member's etcdserver and closes its connections
  875. func (m *member) Close() {
  876. if m.grpcBridge != nil {
  877. m.grpcBridge.Close()
  878. m.grpcBridge = nil
  879. }
  880. if m.serverClient != nil {
  881. m.serverClient.Close()
  882. m.serverClient = nil
  883. }
  884. if m.grpcServer != nil {
  885. m.grpcServer.Stop()
  886. m.grpcServer.GracefulStop()
  887. m.grpcServer = nil
  888. m.grpcServerPeer.Stop()
  889. m.grpcServerPeer.GracefulStop()
  890. m.grpcServerPeer = nil
  891. }
  892. m.s.HardStop()
  893. for _, f := range m.serverClosers {
  894. f()
  895. }
  896. }
  897. // Stop stops the member, but the data dir of the member is preserved.
  898. func (m *member) Stop(t *testing.T) {
  899. lg.Info(
  900. "stopping a member",
  901. zap.String("name", m.Name),
  902. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  903. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  904. zap.String("grpc-address", m.grpcAddr),
  905. )
  906. m.Close()
  907. m.serverClosers = nil
  908. lg.Info(
  909. "stopped a member",
  910. zap.String("name", m.Name),
  911. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  912. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  913. zap.String("grpc-address", m.grpcAddr),
  914. )
  915. }
  916. // checkLeaderTransition waits for leader transition, returning the new leader ID.
  917. func checkLeaderTransition(m *member, oldLead uint64) uint64 {
  918. interval := time.Duration(m.s.Cfg.TickMs) * time.Millisecond
  919. for m.s.Lead() == 0 || (m.s.Lead() == oldLead) {
  920. time.Sleep(interval)
  921. }
  922. return m.s.Lead()
  923. }
  924. // StopNotify unblocks when a member stop completes
  925. func (m *member) StopNotify() <-chan struct{} {
  926. return m.s.StopNotify()
  927. }
  928. // Restart starts the member using the preserved data dir.
  929. func (m *member) Restart(t *testing.T) error {
  930. lg.Info(
  931. "restarting a member",
  932. zap.String("name", m.Name),
  933. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  934. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  935. zap.String("grpc-address", m.grpcAddr),
  936. )
  937. newPeerListeners := make([]net.Listener, 0)
  938. for _, ln := range m.PeerListeners {
  939. newPeerListeners = append(newPeerListeners, NewListenerWithAddr(t, ln.Addr().String()))
  940. }
  941. m.PeerListeners = newPeerListeners
  942. newClientListeners := make([]net.Listener, 0)
  943. for _, ln := range m.ClientListeners {
  944. newClientListeners = append(newClientListeners, NewListenerWithAddr(t, ln.Addr().String()))
  945. }
  946. m.ClientListeners = newClientListeners
  947. if m.grpcListener != nil {
  948. if err := m.listenGRPC(); err != nil {
  949. t.Fatal(err)
  950. }
  951. }
  952. err := m.Launch()
  953. lg.Info(
  954. "restarted a member",
  955. zap.String("name", m.Name),
  956. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  957. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  958. zap.String("grpc-address", m.grpcAddr),
  959. zap.Error(err),
  960. )
  961. return err
  962. }
  963. // Terminate stops the member and removes the data dir.
  964. func (m *member) Terminate(t *testing.T) {
  965. lg.Info(
  966. "terminating a member",
  967. zap.String("name", m.Name),
  968. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  969. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  970. zap.String("grpc-address", m.grpcAddr),
  971. )
  972. m.Close()
  973. if !m.keepDataDirTerminate {
  974. if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
  975. t.Fatal(err)
  976. }
  977. }
  978. lg.Info(
  979. "terminated a member",
  980. zap.String("name", m.Name),
  981. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  982. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  983. zap.String("grpc-address", m.grpcAddr),
  984. )
  985. }
  986. // Metric gets the metric value for a member
  987. func (m *member) Metric(metricName string) (string, error) {
  988. cfgtls := transport.TLSInfo{}
  989. tr, err := transport.NewTimeoutTransport(cfgtls, time.Second, time.Second, time.Second)
  990. if err != nil {
  991. return "", err
  992. }
  993. cli := &http.Client{Transport: tr}
  994. resp, err := cli.Get(m.ClientURLs[0].String() + "/metrics")
  995. if err != nil {
  996. return "", err
  997. }
  998. defer resp.Body.Close()
  999. b, rerr := ioutil.ReadAll(resp.Body)
  1000. if rerr != nil {
  1001. return "", rerr
  1002. }
  1003. lines := strings.Split(string(b), "\n")
  1004. for _, l := range lines {
  1005. if strings.HasPrefix(l, metricName) {
  1006. return strings.Split(l, " ")[1], nil
  1007. }
  1008. }
  1009. return "", nil
  1010. }
  1011. // InjectPartition drops connections from m to others, vice versa.
  1012. func (m *member) InjectPartition(t *testing.T, others ...*member) {
  1013. for _, other := range others {
  1014. m.s.CutPeer(other.s.ID())
  1015. other.s.CutPeer(m.s.ID())
  1016. }
  1017. }
  1018. // RecoverPartition recovers connections from m to others, vice versa.
  1019. func (m *member) RecoverPartition(t *testing.T, others ...*member) {
  1020. for _, other := range others {
  1021. m.s.MendPeer(other.s.ID())
  1022. other.s.MendPeer(m.s.ID())
  1023. }
  1024. }
  1025. func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client {
  1026. cfgtls := transport.TLSInfo{}
  1027. if tls != nil {
  1028. cfgtls = *tls
  1029. }
  1030. cfg := client.Config{Transport: mustNewTransport(t, cfgtls), Endpoints: eps}
  1031. c, err := client.New(cfg)
  1032. if err != nil {
  1033. t.Fatal(err)
  1034. }
  1035. return c
  1036. }
  1037. func mustNewTransport(t *testing.T, tlsInfo transport.TLSInfo) *http.Transport {
  1038. // tick in integration test is short, so 1s dial timeout could play well.
  1039. tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
  1040. if err != nil {
  1041. t.Fatal(err)
  1042. }
  1043. return tr
  1044. }
  1045. type SortableMemberSliceByPeerURLs []client.Member
  1046. func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
  1047. func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {
  1048. return p[i].PeerURLs[0] < p[j].PeerURLs[0]
  1049. }
  1050. func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  1051. type ClusterV3 struct {
  1052. *cluster
  1053. mu sync.Mutex
  1054. clients []*clientv3.Client
  1055. }
  1056. // NewClusterV3 returns a launched cluster with a grpc client connection
  1057. // for each cluster member.
  1058. func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
  1059. cfg.UseGRPC = true
  1060. if os.Getenv("CLIENT_DEBUG") != "" {
  1061. clientv3.SetLogger(grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 4))
  1062. }
  1063. clus := &ClusterV3{
  1064. cluster: NewClusterByConfig(t, cfg),
  1065. }
  1066. clus.Launch(t)
  1067. if !cfg.SkipCreatingClient {
  1068. for _, m := range clus.Members {
  1069. client, err := NewClientV3(m)
  1070. if err != nil {
  1071. t.Fatalf("cannot create client: %v", err)
  1072. }
  1073. clus.clients = append(clus.clients, client)
  1074. }
  1075. }
  1076. return clus
  1077. }
  1078. func (c *ClusterV3) TakeClient(idx int) {
  1079. c.mu.Lock()
  1080. c.clients[idx] = nil
  1081. c.mu.Unlock()
  1082. }
  1083. func (c *ClusterV3) Terminate(t *testing.T) {
  1084. c.mu.Lock()
  1085. for _, client := range c.clients {
  1086. if client == nil {
  1087. continue
  1088. }
  1089. if err := client.Close(); err != nil {
  1090. t.Error(err)
  1091. }
  1092. }
  1093. c.mu.Unlock()
  1094. c.cluster.Terminate(t)
  1095. }
  1096. func (c *ClusterV3) RandClient() *clientv3.Client {
  1097. return c.clients[rand.Intn(len(c.clients))]
  1098. }
  1099. func (c *ClusterV3) Client(i int) *clientv3.Client {
  1100. return c.clients[i]
  1101. }
  1102. type grpcAPI struct {
  1103. // Cluster is the cluster API for the client's connection.
  1104. Cluster pb.ClusterClient
  1105. // KV is the keyvalue API for the client's connection.
  1106. KV pb.KVClient
  1107. // Lease is the lease API for the client's connection.
  1108. Lease pb.LeaseClient
  1109. // Watch is the watch API for the client's connection.
  1110. Watch pb.WatchClient
  1111. // Maintenance is the maintenance API for the client's connection.
  1112. Maintenance pb.MaintenanceClient
  1113. // Auth is the authentication API for the client's connection.
  1114. Auth pb.AuthClient
  1115. // Lock is the lock API for the client's connection.
  1116. Lock lockpb.LockClient
  1117. // Election is the election API for the client's connection.
  1118. Election epb.ElectionClient
  1119. }