cluster.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "fmt"
  19. "io/ioutil"
  20. "log"
  21. "math/rand"
  22. "net"
  23. "net/http"
  24. "net/http/httptest"
  25. "os"
  26. "reflect"
  27. "sort"
  28. "strings"
  29. "sync"
  30. "sync/atomic"
  31. "testing"
  32. "time"
  33. "github.com/coreos/etcd/client"
  34. "github.com/coreos/etcd/clientv3"
  35. "github.com/coreos/etcd/embed"
  36. "github.com/coreos/etcd/etcdserver"
  37. "github.com/coreos/etcd/etcdserver/api/etcdhttp"
  38. "github.com/coreos/etcd/etcdserver/api/rafthttp"
  39. "github.com/coreos/etcd/etcdserver/api/v2http"
  40. "github.com/coreos/etcd/etcdserver/api/v3client"
  41. "github.com/coreos/etcd/etcdserver/api/v3election"
  42. epb "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
  43. "github.com/coreos/etcd/etcdserver/api/v3lock"
  44. lockpb "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
  45. "github.com/coreos/etcd/etcdserver/api/v3rpc"
  46. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  47. "github.com/coreos/etcd/pkg/testutil"
  48. "github.com/coreos/etcd/pkg/tlsutil"
  49. "github.com/coreos/etcd/pkg/transport"
  50. "github.com/coreos/etcd/pkg/types"
  51. "github.com/soheilhy/cmux"
  52. "go.uber.org/zap"
  53. "golang.org/x/crypto/bcrypt"
  54. "google.golang.org/grpc"
  55. "google.golang.org/grpc/grpclog"
  56. "google.golang.org/grpc/keepalive"
  57. )
  58. const (
  59. // RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss.
  60. RequestWaitTimeout = 3 * time.Second
  61. tickDuration = 10 * time.Millisecond
  62. requestTimeout = 20 * time.Second
  63. clusterName = "etcd"
  64. basePort = 21000
  65. UrlScheme = "unix"
  66. UrlSchemeTLS = "unixs"
  67. )
  68. var (
  69. electionTicks = 10
  70. // integration test uses unique ports, counting up, to listen for each
  71. // member, ensuring restarted members can listen on the same port again.
  72. localListenCount int64 = 0
  73. testTLSInfo = transport.TLSInfo{
  74. KeyFile: "./fixtures/server.key.insecure",
  75. CertFile: "./fixtures/server.crt",
  76. TrustedCAFile: "./fixtures/ca.crt",
  77. ClientCertAuth: true,
  78. }
  79. testTLSInfoIP = transport.TLSInfo{
  80. KeyFile: "./fixtures/server-ip.key.insecure",
  81. CertFile: "./fixtures/server-ip.crt",
  82. TrustedCAFile: "./fixtures/ca.crt",
  83. ClientCertAuth: true,
  84. }
  85. testTLSInfoExpired = transport.TLSInfo{
  86. KeyFile: "./fixtures-expired/server.key.insecure",
  87. CertFile: "./fixtures-expired/server.crt",
  88. TrustedCAFile: "./fixtures-expired/ca.crt",
  89. ClientCertAuth: true,
  90. }
  91. testTLSInfoExpiredIP = transport.TLSInfo{
  92. KeyFile: "./fixtures-expired/server-ip.key.insecure",
  93. CertFile: "./fixtures-expired/server-ip.crt",
  94. TrustedCAFile: "./fixtures-expired/ca.crt",
  95. ClientCertAuth: true,
  96. }
  97. defaultTokenJWT = "jwt,pub-key=./fixtures/server.crt,priv-key=./fixtures/server.key.insecure,sign-method=RS256,ttl=1s"
  98. lg = zap.NewNop()
  99. )
  100. func init() {
  101. if os.Getenv("CLUSTER_DEBUG") != "" {
  102. lg, _ = zap.NewProduction()
  103. }
  104. }
  105. type ClusterConfig struct {
  106. Size int
  107. PeerTLS *transport.TLSInfo
  108. ClientTLS *transport.TLSInfo
  109. DiscoveryURL string
  110. AuthToken string
  111. UseGRPC bool
  112. QuotaBackendBytes int64
  113. MaxTxnOps uint
  114. MaxRequestBytes uint
  115. SnapshotCount uint64
  116. SnapshotCatchUpEntries uint64
  117. GRPCKeepAliveMinTime time.Duration
  118. GRPCKeepAliveInterval time.Duration
  119. GRPCKeepAliveTimeout time.Duration
  120. // SkipCreatingClient to skip creating clients for each member.
  121. SkipCreatingClient bool
  122. ClientMaxCallSendMsgSize int
  123. ClientMaxCallRecvMsgSize int
  124. // UseIP is true to use only IP for gRPC requests.
  125. UseIP bool
  126. }
  127. type cluster struct {
  128. cfg *ClusterConfig
  129. Members []*member
  130. }
  131. func schemeFromTLSInfo(tls *transport.TLSInfo) string {
  132. if tls == nil {
  133. return UrlScheme
  134. }
  135. return UrlSchemeTLS
  136. }
  137. func (c *cluster) fillClusterForMembers() error {
  138. if c.cfg.DiscoveryURL != "" {
  139. // cluster will be discovered
  140. return nil
  141. }
  142. addrs := make([]string, 0)
  143. for _, m := range c.Members {
  144. scheme := schemeFromTLSInfo(m.PeerTLSInfo)
  145. for _, l := range m.PeerListeners {
  146. addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String()))
  147. }
  148. }
  149. clusterStr := strings.Join(addrs, ",")
  150. var err error
  151. for _, m := range c.Members {
  152. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  153. if err != nil {
  154. return err
  155. }
  156. }
  157. return nil
  158. }
  159. func newCluster(t *testing.T, cfg *ClusterConfig) *cluster {
  160. c := &cluster{cfg: cfg}
  161. ms := make([]*member, cfg.Size)
  162. for i := 0; i < cfg.Size; i++ {
  163. ms[i] = c.mustNewMember(t)
  164. }
  165. c.Members = ms
  166. if err := c.fillClusterForMembers(); err != nil {
  167. t.Fatal(err)
  168. }
  169. return c
  170. }
  171. // NewCluster returns an unlaunched cluster of the given size which has been
  172. // set to use static bootstrap.
  173. func NewCluster(t *testing.T, size int) *cluster {
  174. return newCluster(t, &ClusterConfig{Size: size})
  175. }
  176. // NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration
  177. func NewClusterByConfig(t *testing.T, cfg *ClusterConfig) *cluster {
  178. return newCluster(t, cfg)
  179. }
  180. func (c *cluster) Launch(t *testing.T) {
  181. errc := make(chan error)
  182. for _, m := range c.Members {
  183. // Members are launched in separate goroutines because if they boot
  184. // using discovery url, they have to wait for others to register to continue.
  185. go func(m *member) {
  186. errc <- m.Launch()
  187. }(m)
  188. }
  189. for range c.Members {
  190. if err := <-errc; err != nil {
  191. t.Fatalf("error setting up member: %v", err)
  192. }
  193. }
  194. // wait cluster to be stable to receive future client requests
  195. c.waitMembersMatch(t, c.HTTPMembers())
  196. c.waitVersion()
  197. }
  198. func (c *cluster) URL(i int) string {
  199. return c.Members[i].ClientURLs[0].String()
  200. }
  201. // URLs returns a list of all active client URLs in the cluster
  202. func (c *cluster) URLs() []string {
  203. return getMembersURLs(c.Members)
  204. }
  205. func getMembersURLs(members []*member) []string {
  206. urls := make([]string, 0)
  207. for _, m := range members {
  208. select {
  209. case <-m.s.StopNotify():
  210. continue
  211. default:
  212. }
  213. for _, u := range m.ClientURLs {
  214. urls = append(urls, u.String())
  215. }
  216. }
  217. return urls
  218. }
  219. // HTTPMembers returns a list of all active members as client.Members
  220. func (c *cluster) HTTPMembers() []client.Member {
  221. ms := []client.Member{}
  222. for _, m := range c.Members {
  223. pScheme := schemeFromTLSInfo(m.PeerTLSInfo)
  224. cScheme := schemeFromTLSInfo(m.ClientTLSInfo)
  225. cm := client.Member{Name: m.Name}
  226. for _, ln := range m.PeerListeners {
  227. cm.PeerURLs = append(cm.PeerURLs, pScheme+"://"+ln.Addr().String())
  228. }
  229. for _, ln := range m.ClientListeners {
  230. cm.ClientURLs = append(cm.ClientURLs, cScheme+"://"+ln.Addr().String())
  231. }
  232. ms = append(ms, cm)
  233. }
  234. return ms
  235. }
  236. func (c *cluster) mustNewMember(t *testing.T) *member {
  237. m := mustNewMember(t,
  238. memberConfig{
  239. name: c.name(rand.Int()),
  240. authToken: c.cfg.AuthToken,
  241. peerTLS: c.cfg.PeerTLS,
  242. clientTLS: c.cfg.ClientTLS,
  243. quotaBackendBytes: c.cfg.QuotaBackendBytes,
  244. maxTxnOps: c.cfg.MaxTxnOps,
  245. maxRequestBytes: c.cfg.MaxRequestBytes,
  246. snapshotCount: c.cfg.SnapshotCount,
  247. snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries,
  248. grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime,
  249. grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
  250. grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout,
  251. clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
  252. clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
  253. useIP: c.cfg.UseIP,
  254. })
  255. m.DiscoveryURL = c.cfg.DiscoveryURL
  256. if c.cfg.UseGRPC {
  257. if err := m.listenGRPC(); err != nil {
  258. t.Fatal(err)
  259. }
  260. }
  261. return m
  262. }
  263. func (c *cluster) addMember(t *testing.T) {
  264. m := c.mustNewMember(t)
  265. scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
  266. // send add request to the cluster
  267. var err error
  268. for i := 0; i < len(c.Members); i++ {
  269. clientURL := c.URL(i)
  270. peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
  271. if err = c.addMemberByURL(t, clientURL, peerURL); err == nil {
  272. break
  273. }
  274. }
  275. if err != nil {
  276. t.Fatalf("add member failed on all members error: %v", err)
  277. }
  278. m.InitialPeerURLsMap = types.URLsMap{}
  279. for _, mm := range c.Members {
  280. m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
  281. }
  282. m.InitialPeerURLsMap[m.Name] = m.PeerURLs
  283. m.NewCluster = false
  284. if err := m.Launch(); err != nil {
  285. t.Fatal(err)
  286. }
  287. c.Members = append(c.Members, m)
  288. // wait cluster to be stable to receive future client requests
  289. c.waitMembersMatch(t, c.HTTPMembers())
  290. }
  291. func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error {
  292. cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
  293. ma := client.NewMembersAPI(cc)
  294. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  295. _, err := ma.Add(ctx, peerURL)
  296. cancel()
  297. if err != nil {
  298. return err
  299. }
  300. // wait for the add node entry applied in the cluster
  301. members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
  302. c.waitMembersMatch(t, members)
  303. return nil
  304. }
  305. func (c *cluster) AddMember(t *testing.T) {
  306. c.addMember(t)
  307. }
  308. func (c *cluster) RemoveMember(t *testing.T, id uint64) {
  309. if err := c.removeMember(t, id); err != nil {
  310. t.Fatal(err)
  311. }
  312. }
  313. func (c *cluster) removeMember(t *testing.T, id uint64) error {
  314. // send remove request to the cluster
  315. cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
  316. ma := client.NewMembersAPI(cc)
  317. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  318. err := ma.Remove(ctx, types.ID(id).String())
  319. cancel()
  320. if err != nil {
  321. return err
  322. }
  323. newMembers := make([]*member, 0)
  324. for _, m := range c.Members {
  325. if uint64(m.s.ID()) != id {
  326. newMembers = append(newMembers, m)
  327. } else {
  328. select {
  329. case <-m.s.StopNotify():
  330. m.Terminate(t)
  331. // 1s stop delay + election timeout + 1s disk and network delay + connection write timeout
  332. // TODO: remove connection write timeout by selecting on http response closeNotifier
  333. // blocking on https://github.com/golang/go/issues/9524
  334. case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout):
  335. t.Fatalf("failed to remove member %s in time", m.s.ID())
  336. }
  337. }
  338. }
  339. c.Members = newMembers
  340. c.waitMembersMatch(t, c.HTTPMembers())
  341. return nil
  342. }
  343. func (c *cluster) Terminate(t *testing.T) {
  344. var wg sync.WaitGroup
  345. wg.Add(len(c.Members))
  346. for _, m := range c.Members {
  347. go func(mm *member) {
  348. defer wg.Done()
  349. mm.Terminate(t)
  350. }(m)
  351. }
  352. wg.Wait()
  353. }
  354. func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
  355. for _, u := range c.URLs() {
  356. cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
  357. ma := client.NewMembersAPI(cc)
  358. for {
  359. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  360. ms, err := ma.List(ctx)
  361. cancel()
  362. if err == nil && isMembersEqual(ms, membs) {
  363. break
  364. }
  365. time.Sleep(tickDuration)
  366. }
  367. }
  368. }
  369. func (c *cluster) WaitLeader(t *testing.T) int { return c.waitLeader(t, c.Members) }
  370. // waitLeader waits until given members agree on the same leader.
  371. func (c *cluster) waitLeader(t *testing.T, membs []*member) int {
  372. possibleLead := make(map[uint64]bool)
  373. var lead uint64
  374. for _, m := range membs {
  375. possibleLead[uint64(m.s.ID())] = true
  376. }
  377. cc := MustNewHTTPClient(t, getMembersURLs(membs), nil)
  378. kapi := client.NewKeysAPI(cc)
  379. // ensure leader is up via linearizable get
  380. for {
  381. ctx, cancel := context.WithTimeout(context.Background(), 10*tickDuration+time.Second)
  382. _, err := kapi.Get(ctx, "0", &client.GetOptions{Quorum: true})
  383. cancel()
  384. if err == nil || strings.Contains(err.Error(), "Key not found") {
  385. break
  386. }
  387. }
  388. for lead == 0 || !possibleLead[lead] {
  389. lead = 0
  390. for _, m := range membs {
  391. select {
  392. case <-m.s.StopNotify():
  393. continue
  394. default:
  395. }
  396. if lead != 0 && lead != m.s.Lead() {
  397. lead = 0
  398. time.Sleep(10 * tickDuration)
  399. break
  400. }
  401. lead = m.s.Lead()
  402. }
  403. }
  404. for i, m := range membs {
  405. if uint64(m.s.ID()) == lead {
  406. return i
  407. }
  408. }
  409. return -1
  410. }
  411. func (c *cluster) WaitNoLeader() { c.waitNoLeader(c.Members) }
  412. // waitNoLeader waits until given members lose leader.
  413. func (c *cluster) waitNoLeader(membs []*member) {
  414. noLeader := false
  415. for !noLeader {
  416. noLeader = true
  417. for _, m := range membs {
  418. select {
  419. case <-m.s.StopNotify():
  420. continue
  421. default:
  422. }
  423. if m.s.Lead() != 0 {
  424. noLeader = false
  425. time.Sleep(10 * tickDuration)
  426. break
  427. }
  428. }
  429. }
  430. }
  431. func (c *cluster) waitVersion() {
  432. for _, m := range c.Members {
  433. for {
  434. if m.s.ClusterVersion() != nil {
  435. break
  436. }
  437. time.Sleep(tickDuration)
  438. }
  439. }
  440. }
  441. func (c *cluster) name(i int) string {
  442. return fmt.Sprint(i)
  443. }
  444. // isMembersEqual checks whether two members equal except ID field.
  445. // The given wmembs should always set ID field to empty string.
  446. func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
  447. sort.Sort(SortableMemberSliceByPeerURLs(membs))
  448. sort.Sort(SortableMemberSliceByPeerURLs(wmembs))
  449. for i := range membs {
  450. membs[i].ID = ""
  451. }
  452. return reflect.DeepEqual(membs, wmembs)
  453. }
  454. func newLocalListener(t *testing.T) net.Listener {
  455. c := atomic.AddInt64(&localListenCount, 1)
  456. // Go 1.8+ allows only numbers in port
  457. addr := fmt.Sprintf("127.0.0.1:%05d%05d", c+basePort, os.Getpid())
  458. return NewListenerWithAddr(t, addr)
  459. }
  460. func NewListenerWithAddr(t *testing.T, addr string) net.Listener {
  461. l, err := transport.NewUnixListener(addr)
  462. if err != nil {
  463. t.Fatal(err)
  464. }
  465. return l
  466. }
  467. type member struct {
  468. etcdserver.ServerConfig
  469. PeerListeners, ClientListeners []net.Listener
  470. grpcListener net.Listener
  471. // PeerTLSInfo enables peer TLS when set
  472. PeerTLSInfo *transport.TLSInfo
  473. // ClientTLSInfo enables client TLS when set
  474. ClientTLSInfo *transport.TLSInfo
  475. DialOptions []grpc.DialOption
  476. raftHandler *testutil.PauseableHandler
  477. s *etcdserver.EtcdServer
  478. serverClosers []func()
  479. grpcServerOpts []grpc.ServerOption
  480. grpcServer *grpc.Server
  481. grpcServerPeer *grpc.Server
  482. grpcAddr string
  483. grpcBridge *bridge
  484. // serverClient is a clientv3 that directly calls the etcdserver.
  485. serverClient *clientv3.Client
  486. keepDataDirTerminate bool
  487. clientMaxCallSendMsgSize int
  488. clientMaxCallRecvMsgSize int
  489. useIP bool
  490. }
  491. func (m *member) GRPCAddr() string { return m.grpcAddr }
  492. type memberConfig struct {
  493. name string
  494. peerTLS *transport.TLSInfo
  495. clientTLS *transport.TLSInfo
  496. authToken string
  497. quotaBackendBytes int64
  498. maxTxnOps uint
  499. maxRequestBytes uint
  500. snapshotCount uint64
  501. snapshotCatchUpEntries uint64
  502. grpcKeepAliveMinTime time.Duration
  503. grpcKeepAliveInterval time.Duration
  504. grpcKeepAliveTimeout time.Duration
  505. clientMaxCallSendMsgSize int
  506. clientMaxCallRecvMsgSize int
  507. useIP bool
  508. }
  509. // mustNewMember return an inited member with the given name. If peerTLS is
  510. // set, it will use https scheme to communicate between peers.
  511. func mustNewMember(t *testing.T, mcfg memberConfig) *member {
  512. var err error
  513. m := &member{}
  514. peerScheme := schemeFromTLSInfo(mcfg.peerTLS)
  515. clientScheme := schemeFromTLSInfo(mcfg.clientTLS)
  516. pln := newLocalListener(t)
  517. m.PeerListeners = []net.Listener{pln}
  518. m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()})
  519. if err != nil {
  520. t.Fatal(err)
  521. }
  522. m.PeerTLSInfo = mcfg.peerTLS
  523. cln := newLocalListener(t)
  524. m.ClientListeners = []net.Listener{cln}
  525. m.ClientURLs, err = types.NewURLs([]string{clientScheme + "://" + cln.Addr().String()})
  526. if err != nil {
  527. t.Fatal(err)
  528. }
  529. m.ClientTLSInfo = mcfg.clientTLS
  530. m.Name = mcfg.name
  531. m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
  532. if err != nil {
  533. t.Fatal(err)
  534. }
  535. clusterStr := fmt.Sprintf("%s=%s://%s", mcfg.name, peerScheme, pln.Addr().String())
  536. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  537. if err != nil {
  538. t.Fatal(err)
  539. }
  540. m.InitialClusterToken = clusterName
  541. m.NewCluster = true
  542. m.BootstrapTimeout = 10 * time.Millisecond
  543. if m.PeerTLSInfo != nil {
  544. m.ServerConfig.PeerTLSInfo = *m.PeerTLSInfo
  545. }
  546. m.ElectionTicks = electionTicks
  547. m.InitialElectionTickAdvance = true
  548. m.TickMs = uint(tickDuration / time.Millisecond)
  549. m.QuotaBackendBytes = mcfg.quotaBackendBytes
  550. m.MaxTxnOps = mcfg.maxTxnOps
  551. if m.MaxTxnOps == 0 {
  552. m.MaxTxnOps = embed.DefaultMaxTxnOps
  553. }
  554. m.MaxRequestBytes = mcfg.maxRequestBytes
  555. if m.MaxRequestBytes == 0 {
  556. m.MaxRequestBytes = embed.DefaultMaxRequestBytes
  557. }
  558. m.SnapshotCount = etcdserver.DefaultSnapshotCount
  559. if mcfg.snapshotCount != 0 {
  560. m.SnapshotCount = mcfg.snapshotCount
  561. }
  562. m.SnapshotCatchUpEntries = etcdserver.DefaultSnapshotCatchUpEntries
  563. if mcfg.snapshotCatchUpEntries != 0 {
  564. m.SnapshotCatchUpEntries = mcfg.snapshotCatchUpEntries
  565. }
  566. // for the purpose of integration testing, simple token is enough
  567. m.AuthToken = "simple"
  568. if mcfg.authToken != "" {
  569. m.AuthToken = mcfg.authToken
  570. }
  571. m.BcryptCost = uint(bcrypt.MinCost) // use min bcrypt cost to speedy up integration testing
  572. m.grpcServerOpts = []grpc.ServerOption{}
  573. if mcfg.grpcKeepAliveMinTime > time.Duration(0) {
  574. m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
  575. MinTime: mcfg.grpcKeepAliveMinTime,
  576. PermitWithoutStream: false,
  577. }))
  578. }
  579. if mcfg.grpcKeepAliveInterval > time.Duration(0) &&
  580. mcfg.grpcKeepAliveTimeout > time.Duration(0) {
  581. m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveParams(keepalive.ServerParameters{
  582. Time: mcfg.grpcKeepAliveInterval,
  583. Timeout: mcfg.grpcKeepAliveTimeout,
  584. }))
  585. }
  586. m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize
  587. m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize
  588. m.useIP = mcfg.useIP
  589. m.InitialCorruptCheck = true
  590. m.LoggerConfig = &zap.Config{
  591. Level: zap.NewAtomicLevelAt(zap.InfoLevel),
  592. Development: false,
  593. Sampling: &zap.SamplingConfig{
  594. Initial: 100,
  595. Thereafter: 100,
  596. },
  597. Encoding: "json",
  598. EncoderConfig: zap.NewProductionEncoderConfig(),
  599. OutputPaths: []string{"/dev/null"},
  600. ErrorOutputPaths: []string{"/dev/null"},
  601. }
  602. if os.Getenv("CLUSTER_DEBUG") != "" {
  603. m.LoggerConfig.OutputPaths = []string{"stderr"}
  604. m.LoggerConfig.ErrorOutputPaths = []string{"stderr"}
  605. }
  606. m.Logger, err = m.LoggerConfig.Build()
  607. if err != nil {
  608. t.Fatal(err)
  609. }
  610. return m
  611. }
  612. // listenGRPC starts a grpc server over a unix domain socket on the member
  613. func (m *member) listenGRPC() error {
  614. // prefix with localhost so cert has right domain
  615. m.grpcAddr = "localhost:" + m.Name
  616. if m.useIP { // for IP-only TLS certs
  617. m.grpcAddr = "127.0.0.1:" + m.Name
  618. }
  619. l, err := transport.NewUnixListener(m.grpcAddr)
  620. if err != nil {
  621. return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
  622. }
  623. m.grpcBridge, err = newBridge(m.grpcAddr)
  624. if err != nil {
  625. l.Close()
  626. return err
  627. }
  628. m.grpcAddr = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + m.grpcBridge.inaddr
  629. m.grpcListener = l
  630. return nil
  631. }
  632. func (m *member) ElectionTimeout() time.Duration {
  633. return time.Duration(m.s.Cfg.ElectionTicks*int(m.s.Cfg.TickMs)) * time.Millisecond
  634. }
  635. func (m *member) ID() types.ID { return m.s.ID() }
  636. func (m *member) DropConnections() { m.grpcBridge.Reset() }
  637. func (m *member) PauseConnections() { m.grpcBridge.Pause() }
  638. func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
  639. func (m *member) Blackhole() { m.grpcBridge.Blackhole() }
  640. func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() }
  641. // NewClientV3 creates a new grpc client connection to the member
  642. func NewClientV3(m *member) (*clientv3.Client, error) {
  643. if m.grpcAddr == "" {
  644. return nil, fmt.Errorf("member not configured for grpc")
  645. }
  646. cfg := clientv3.Config{
  647. Endpoints: []string{m.grpcAddr},
  648. DialTimeout: 5 * time.Second,
  649. MaxCallSendMsgSize: m.clientMaxCallSendMsgSize,
  650. MaxCallRecvMsgSize: m.clientMaxCallRecvMsgSize,
  651. }
  652. if m.ClientTLSInfo != nil {
  653. tls, err := m.ClientTLSInfo.ClientConfig()
  654. if err != nil {
  655. return nil, err
  656. }
  657. cfg.TLS = tls
  658. }
  659. if m.DialOptions != nil {
  660. cfg.DialOptions = append(cfg.DialOptions, m.DialOptions...)
  661. }
  662. return newClientV3(cfg)
  663. }
  664. // Clone returns a member with the same server configuration. The returned
  665. // member will not set PeerListeners and ClientListeners.
  666. func (m *member) Clone(t *testing.T) *member {
  667. mm := &member{}
  668. mm.ServerConfig = m.ServerConfig
  669. var err error
  670. clientURLStrs := m.ClientURLs.StringSlice()
  671. mm.ClientURLs, err = types.NewURLs(clientURLStrs)
  672. if err != nil {
  673. // this should never fail
  674. panic(err)
  675. }
  676. peerURLStrs := m.PeerURLs.StringSlice()
  677. mm.PeerURLs, err = types.NewURLs(peerURLStrs)
  678. if err != nil {
  679. // this should never fail
  680. panic(err)
  681. }
  682. clusterStr := m.InitialPeerURLsMap.String()
  683. mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  684. if err != nil {
  685. // this should never fail
  686. panic(err)
  687. }
  688. mm.InitialClusterToken = m.InitialClusterToken
  689. mm.ElectionTicks = m.ElectionTicks
  690. mm.PeerTLSInfo = m.PeerTLSInfo
  691. mm.ClientTLSInfo = m.ClientTLSInfo
  692. return mm
  693. }
  694. // Launch starts a member based on ServerConfig, PeerListeners
  695. // and ClientListeners.
  696. func (m *member) Launch() error {
  697. lg.Info(
  698. "launching a member",
  699. zap.String("name", m.Name),
  700. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  701. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  702. zap.String("grpc-address", m.grpcAddr),
  703. )
  704. var err error
  705. if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
  706. return fmt.Errorf("failed to initialize the etcd server: %v", err)
  707. }
  708. m.s.SyncTicker = time.NewTicker(500 * time.Millisecond)
  709. m.s.Start()
  710. var peerTLScfg *tls.Config
  711. if m.PeerTLSInfo != nil && !m.PeerTLSInfo.Empty() {
  712. if peerTLScfg, err = m.PeerTLSInfo.ServerConfig(); err != nil {
  713. return err
  714. }
  715. }
  716. if m.grpcListener != nil {
  717. var (
  718. tlscfg *tls.Config
  719. )
  720. if m.ClientTLSInfo != nil && !m.ClientTLSInfo.Empty() {
  721. tlscfg, err = m.ClientTLSInfo.ServerConfig()
  722. if err != nil {
  723. return err
  724. }
  725. }
  726. m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...)
  727. m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg)
  728. m.serverClient = v3client.New(m.s)
  729. lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient))
  730. epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient))
  731. go m.grpcServer.Serve(m.grpcListener)
  732. }
  733. m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.Logger, m.s)}
  734. h := (http.Handler)(m.raftHandler)
  735. if m.grpcListener != nil {
  736. h = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  737. if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
  738. m.grpcServerPeer.ServeHTTP(w, r)
  739. } else {
  740. m.raftHandler.ServeHTTP(w, r)
  741. }
  742. })
  743. }
  744. for _, ln := range m.PeerListeners {
  745. cm := cmux.New(ln)
  746. // don't hang on matcher after closing listener
  747. cm.SetReadTimeout(time.Second)
  748. if m.grpcServer != nil {
  749. grpcl := cm.Match(cmux.HTTP2())
  750. go m.grpcServerPeer.Serve(grpcl)
  751. }
  752. // serve http1/http2 rafthttp/grpc
  753. ll := cm.Match(cmux.Any())
  754. if peerTLScfg != nil {
  755. if ll, err = transport.NewTLSListener(ll, m.PeerTLSInfo); err != nil {
  756. return err
  757. }
  758. }
  759. hs := &httptest.Server{
  760. Listener: ll,
  761. Config: &http.Server{
  762. Handler: h,
  763. TLSConfig: peerTLScfg,
  764. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  765. },
  766. TLS: peerTLScfg,
  767. }
  768. hs.Start()
  769. donec := make(chan struct{})
  770. go func() {
  771. defer close(donec)
  772. cm.Serve()
  773. }()
  774. closer := func() {
  775. ll.Close()
  776. hs.CloseClientConnections()
  777. hs.Close()
  778. <-donec
  779. }
  780. m.serverClosers = append(m.serverClosers, closer)
  781. }
  782. for _, ln := range m.ClientListeners {
  783. hs := &httptest.Server{
  784. Listener: ln,
  785. Config: &http.Server{
  786. Handler: v2http.NewClientHandler(
  787. m.Logger,
  788. m.s,
  789. m.ServerConfig.ReqTimeout(),
  790. ),
  791. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  792. },
  793. }
  794. if m.ClientTLSInfo == nil {
  795. hs.Start()
  796. } else {
  797. info := m.ClientTLSInfo
  798. hs.TLS, err = info.ServerConfig()
  799. if err != nil {
  800. return err
  801. }
  802. // baseConfig is called on initial TLS handshake start.
  803. //
  804. // Previously,
  805. // 1. Server has non-empty (*tls.Config).Certificates on client hello
  806. // 2. Server calls (*tls.Config).GetCertificate iff:
  807. // - Server's (*tls.Config).Certificates is not empty, or
  808. // - Client supplies SNI; non-empty (*tls.ClientHelloInfo).ServerName
  809. //
  810. // When (*tls.Config).Certificates is always populated on initial handshake,
  811. // client is expected to provide a valid matching SNI to pass the TLS
  812. // verification, thus trigger server (*tls.Config).GetCertificate to reload
  813. // TLS assets. However, a cert whose SAN field does not include domain names
  814. // but only IP addresses, has empty (*tls.ClientHelloInfo).ServerName, thus
  815. // it was never able to trigger TLS reload on initial handshake; first
  816. // ceritifcate object was being used, never being updated.
  817. //
  818. // Now, (*tls.Config).Certificates is created empty on initial TLS client
  819. // handshake, in order to trigger (*tls.Config).GetCertificate and populate
  820. // rest of the certificates on every new TLS connection, even when client
  821. // SNI is empty (e.g. cert only includes IPs).
  822. //
  823. // This introduces another problem with "httptest.Server":
  824. // when server initial certificates are empty, certificates
  825. // are overwritten by Go's internal test certs, which have
  826. // different SAN fields (e.g. example.com). To work around,
  827. // re-overwrite (*tls.Config).Certificates before starting
  828. // test server.
  829. tlsCert, err := tlsutil.NewCert(info.CertFile, info.KeyFile, nil)
  830. if err != nil {
  831. return err
  832. }
  833. hs.TLS.Certificates = []tls.Certificate{*tlsCert}
  834. hs.StartTLS()
  835. }
  836. closer := func() {
  837. ln.Close()
  838. hs.CloseClientConnections()
  839. hs.Close()
  840. }
  841. m.serverClosers = append(m.serverClosers, closer)
  842. }
  843. lg.Info(
  844. "launched a member",
  845. zap.String("name", m.Name),
  846. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  847. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  848. zap.String("grpc-address", m.grpcAddr),
  849. )
  850. return nil
  851. }
  852. func (m *member) WaitOK(t *testing.T) {
  853. m.WaitStarted(t)
  854. for m.s.Leader() == 0 {
  855. time.Sleep(tickDuration)
  856. }
  857. }
  858. func (m *member) WaitStarted(t *testing.T) {
  859. cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
  860. kapi := client.NewKeysAPI(cc)
  861. for {
  862. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  863. _, err := kapi.Get(ctx, "/", nil)
  864. if err != nil {
  865. time.Sleep(tickDuration)
  866. continue
  867. }
  868. cancel()
  869. break
  870. }
  871. }
  872. func (m *member) URL() string { return m.ClientURLs[0].String() }
  873. func (m *member) Pause() {
  874. m.raftHandler.Pause()
  875. m.s.PauseSending()
  876. }
  877. func (m *member) Resume() {
  878. m.raftHandler.Resume()
  879. m.s.ResumeSending()
  880. }
  881. // Close stops the member's etcdserver and closes its connections
  882. func (m *member) Close() {
  883. if m.grpcBridge != nil {
  884. m.grpcBridge.Close()
  885. m.grpcBridge = nil
  886. }
  887. if m.serverClient != nil {
  888. m.serverClient.Close()
  889. m.serverClient = nil
  890. }
  891. if m.grpcServer != nil {
  892. m.grpcServer.Stop()
  893. m.grpcServer.GracefulStop()
  894. m.grpcServer = nil
  895. m.grpcServerPeer.Stop()
  896. m.grpcServerPeer.GracefulStop()
  897. m.grpcServerPeer = nil
  898. }
  899. m.s.HardStop()
  900. for _, f := range m.serverClosers {
  901. f()
  902. }
  903. }
  904. // Stop stops the member, but the data dir of the member is preserved.
  905. func (m *member) Stop(t *testing.T) {
  906. lg.Info(
  907. "stopping a member",
  908. zap.String("name", m.Name),
  909. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  910. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  911. zap.String("grpc-address", m.grpcAddr),
  912. )
  913. m.Close()
  914. m.serverClosers = nil
  915. lg.Info(
  916. "stopped a member",
  917. zap.String("name", m.Name),
  918. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  919. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  920. zap.String("grpc-address", m.grpcAddr),
  921. )
  922. }
  923. // checkLeaderTransition waits for leader transition, returning the new leader ID.
  924. func checkLeaderTransition(m *member, oldLead uint64) uint64 {
  925. interval := time.Duration(m.s.Cfg.TickMs) * time.Millisecond
  926. for m.s.Lead() == 0 || (m.s.Lead() == oldLead) {
  927. time.Sleep(interval)
  928. }
  929. return m.s.Lead()
  930. }
  931. // StopNotify unblocks when a member stop completes
  932. func (m *member) StopNotify() <-chan struct{} {
  933. return m.s.StopNotify()
  934. }
  935. // Restart starts the member using the preserved data dir.
  936. func (m *member) Restart(t *testing.T) error {
  937. lg.Info(
  938. "restarting a member",
  939. zap.String("name", m.Name),
  940. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  941. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  942. zap.String("grpc-address", m.grpcAddr),
  943. )
  944. newPeerListeners := make([]net.Listener, 0)
  945. for _, ln := range m.PeerListeners {
  946. newPeerListeners = append(newPeerListeners, NewListenerWithAddr(t, ln.Addr().String()))
  947. }
  948. m.PeerListeners = newPeerListeners
  949. newClientListeners := make([]net.Listener, 0)
  950. for _, ln := range m.ClientListeners {
  951. newClientListeners = append(newClientListeners, NewListenerWithAddr(t, ln.Addr().String()))
  952. }
  953. m.ClientListeners = newClientListeners
  954. if m.grpcListener != nil {
  955. if err := m.listenGRPC(); err != nil {
  956. t.Fatal(err)
  957. }
  958. }
  959. err := m.Launch()
  960. lg.Info(
  961. "restarted a member",
  962. zap.String("name", m.Name),
  963. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  964. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  965. zap.String("grpc-address", m.grpcAddr),
  966. zap.Error(err),
  967. )
  968. return err
  969. }
  970. // Terminate stops the member and removes the data dir.
  971. func (m *member) Terminate(t *testing.T) {
  972. lg.Info(
  973. "terminating a member",
  974. zap.String("name", m.Name),
  975. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  976. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  977. zap.String("grpc-address", m.grpcAddr),
  978. )
  979. m.Close()
  980. if !m.keepDataDirTerminate {
  981. if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
  982. t.Fatal(err)
  983. }
  984. }
  985. lg.Info(
  986. "terminated a member",
  987. zap.String("name", m.Name),
  988. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  989. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  990. zap.String("grpc-address", m.grpcAddr),
  991. )
  992. }
  993. // Metric gets the metric value for a member
  994. func (m *member) Metric(metricName string) (string, error) {
  995. cfgtls := transport.TLSInfo{}
  996. tr, err := transport.NewTimeoutTransport(cfgtls, time.Second, time.Second, time.Second)
  997. if err != nil {
  998. return "", err
  999. }
  1000. cli := &http.Client{Transport: tr}
  1001. resp, err := cli.Get(m.ClientURLs[0].String() + "/metrics")
  1002. if err != nil {
  1003. return "", err
  1004. }
  1005. defer resp.Body.Close()
  1006. b, rerr := ioutil.ReadAll(resp.Body)
  1007. if rerr != nil {
  1008. return "", rerr
  1009. }
  1010. lines := strings.Split(string(b), "\n")
  1011. for _, l := range lines {
  1012. if strings.HasPrefix(l, metricName) {
  1013. return strings.Split(l, " ")[1], nil
  1014. }
  1015. }
  1016. return "", nil
  1017. }
  1018. // InjectPartition drops connections from m to others, vice versa.
  1019. func (m *member) InjectPartition(t *testing.T, others ...*member) {
  1020. for _, other := range others {
  1021. m.s.CutPeer(other.s.ID())
  1022. other.s.CutPeer(m.s.ID())
  1023. }
  1024. }
  1025. // RecoverPartition recovers connections from m to others, vice versa.
  1026. func (m *member) RecoverPartition(t *testing.T, others ...*member) {
  1027. for _, other := range others {
  1028. m.s.MendPeer(other.s.ID())
  1029. other.s.MendPeer(m.s.ID())
  1030. }
  1031. }
  1032. func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client {
  1033. cfgtls := transport.TLSInfo{}
  1034. if tls != nil {
  1035. cfgtls = *tls
  1036. }
  1037. cfg := client.Config{Transport: mustNewTransport(t, cfgtls), Endpoints: eps}
  1038. c, err := client.New(cfg)
  1039. if err != nil {
  1040. t.Fatal(err)
  1041. }
  1042. return c
  1043. }
  1044. func mustNewTransport(t *testing.T, tlsInfo transport.TLSInfo) *http.Transport {
  1045. // tick in integration test is short, so 1s dial timeout could play well.
  1046. tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
  1047. if err != nil {
  1048. t.Fatal(err)
  1049. }
  1050. return tr
  1051. }
  1052. type SortableMemberSliceByPeerURLs []client.Member
  1053. func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
  1054. func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {
  1055. return p[i].PeerURLs[0] < p[j].PeerURLs[0]
  1056. }
  1057. func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  1058. type ClusterV3 struct {
  1059. *cluster
  1060. mu sync.Mutex
  1061. clients []*clientv3.Client
  1062. }
  1063. // NewClusterV3 returns a launched cluster with a grpc client connection
  1064. // for each cluster member.
  1065. func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
  1066. cfg.UseGRPC = true
  1067. if os.Getenv("CLIENT_DEBUG") != "" {
  1068. clientv3.SetLogger(grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 4))
  1069. }
  1070. clus := &ClusterV3{
  1071. cluster: NewClusterByConfig(t, cfg),
  1072. }
  1073. clus.Launch(t)
  1074. if !cfg.SkipCreatingClient {
  1075. for _, m := range clus.Members {
  1076. client, err := NewClientV3(m)
  1077. if err != nil {
  1078. t.Fatalf("cannot create client: %v", err)
  1079. }
  1080. clus.clients = append(clus.clients, client)
  1081. }
  1082. }
  1083. return clus
  1084. }
  1085. func (c *ClusterV3) TakeClient(idx int) {
  1086. c.mu.Lock()
  1087. c.clients[idx] = nil
  1088. c.mu.Unlock()
  1089. }
  1090. func (c *ClusterV3) Terminate(t *testing.T) {
  1091. c.mu.Lock()
  1092. for _, client := range c.clients {
  1093. if client == nil {
  1094. continue
  1095. }
  1096. if err := client.Close(); err != nil {
  1097. t.Error(err)
  1098. }
  1099. }
  1100. c.mu.Unlock()
  1101. c.cluster.Terminate(t)
  1102. }
  1103. func (c *ClusterV3) RandClient() *clientv3.Client {
  1104. return c.clients[rand.Intn(len(c.clients))]
  1105. }
  1106. func (c *ClusterV3) Client(i int) *clientv3.Client {
  1107. return c.clients[i]
  1108. }
  1109. type grpcAPI struct {
  1110. // Cluster is the cluster API for the client's connection.
  1111. Cluster pb.ClusterClient
  1112. // KV is the keyvalue API for the client's connection.
  1113. KV pb.KVClient
  1114. // Lease is the lease API for the client's connection.
  1115. Lease pb.LeaseClient
  1116. // Watch is the watch API for the client's connection.
  1117. Watch pb.WatchClient
  1118. // Maintenance is the maintenance API for the client's connection.
  1119. Maintenance pb.MaintenanceClient
  1120. // Auth is the authentication API for the client's connection.
  1121. Auth pb.AuthClient
  1122. // Lock is the lock API for the client's connection.
  1123. Lock lockpb.LockClient
  1124. // Election is the election API for the client's connection.
  1125. Election epb.ElectionClient
  1126. }