cluster.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "fmt"
  19. "io/ioutil"
  20. "log"
  21. "math/rand"
  22. "net"
  23. "net/http"
  24. "net/http/httptest"
  25. "os"
  26. "reflect"
  27. "sort"
  28. "strings"
  29. "sync"
  30. "sync/atomic"
  31. "testing"
  32. "time"
  33. "github.com/coreos/etcd/client"
  34. "github.com/coreos/etcd/clientv3"
  35. "github.com/coreos/etcd/embed"
  36. "github.com/coreos/etcd/etcdserver"
  37. "github.com/coreos/etcd/etcdserver/api/etcdhttp"
  38. "github.com/coreos/etcd/etcdserver/api/v2http"
  39. "github.com/coreos/etcd/etcdserver/api/v3client"
  40. "github.com/coreos/etcd/etcdserver/api/v3election"
  41. epb "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
  42. "github.com/coreos/etcd/etcdserver/api/v3lock"
  43. lockpb "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
  44. "github.com/coreos/etcd/etcdserver/api/v3rpc"
  45. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  46. "github.com/coreos/etcd/pkg/testutil"
  47. "github.com/coreos/etcd/pkg/tlsutil"
  48. "github.com/coreos/etcd/pkg/transport"
  49. "github.com/coreos/etcd/pkg/types"
  50. "github.com/coreos/etcd/rafthttp"
  51. "github.com/soheilhy/cmux"
  52. "go.uber.org/zap"
  53. "golang.org/x/crypto/bcrypt"
  54. "google.golang.org/grpc"
  55. "google.golang.org/grpc/grpclog"
  56. "google.golang.org/grpc/keepalive"
  57. )
  58. const (
  59. // RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss.
  60. RequestWaitTimeout = 3 * time.Second
  61. tickDuration = 10 * time.Millisecond
  62. requestTimeout = 20 * time.Second
  63. clusterName = "etcd"
  64. basePort = 21000
  65. UrlScheme = "unix"
  66. UrlSchemeTLS = "unixs"
  67. )
  68. var (
  69. electionTicks = 10
  70. // integration test uses unique ports, counting up, to listen for each
  71. // member, ensuring restarted members can listen on the same port again.
  72. localListenCount int64 = 0
  73. testTLSInfo = transport.TLSInfo{
  74. KeyFile: "./fixtures/server.key.insecure",
  75. CertFile: "./fixtures/server.crt",
  76. TrustedCAFile: "./fixtures/ca.crt",
  77. ClientCertAuth: true,
  78. }
  79. testTLSInfoIP = transport.TLSInfo{
  80. KeyFile: "./fixtures/server-ip.key.insecure",
  81. CertFile: "./fixtures/server-ip.crt",
  82. TrustedCAFile: "./fixtures/ca.crt",
  83. ClientCertAuth: true,
  84. }
  85. testTLSInfoExpired = transport.TLSInfo{
  86. KeyFile: "./fixtures-expired/server.key.insecure",
  87. CertFile: "./fixtures-expired/server.crt",
  88. TrustedCAFile: "./fixtures-expired/ca.crt",
  89. ClientCertAuth: true,
  90. }
  91. testTLSInfoExpiredIP = transport.TLSInfo{
  92. KeyFile: "./fixtures-expired/server-ip.key.insecure",
  93. CertFile: "./fixtures-expired/server-ip.crt",
  94. TrustedCAFile: "./fixtures-expired/ca.crt",
  95. ClientCertAuth: true,
  96. }
  97. lg = zap.NewNop()
  98. )
  99. func init() {
  100. if os.Getenv("CLUSTER_DEBUG") != "" {
  101. lg, _ = zap.NewProduction()
  102. }
  103. }
  104. type ClusterConfig struct {
  105. Size int
  106. PeerTLS *transport.TLSInfo
  107. ClientTLS *transport.TLSInfo
  108. DiscoveryURL string
  109. UseGRPC bool
  110. QuotaBackendBytes int64
  111. MaxTxnOps uint
  112. MaxRequestBytes uint
  113. SnapshotCount uint64
  114. SnapshotCatchUpEntries uint64
  115. GRPCKeepAliveMinTime time.Duration
  116. GRPCKeepAliveInterval time.Duration
  117. GRPCKeepAliveTimeout time.Duration
  118. // SkipCreatingClient to skip creating clients for each member.
  119. SkipCreatingClient bool
  120. ClientMaxCallSendMsgSize int
  121. ClientMaxCallRecvMsgSize int
  122. // UseIP is true to use only IP for gRPC requests.
  123. UseIP bool
  124. }
  125. type cluster struct {
  126. cfg *ClusterConfig
  127. Members []*member
  128. }
  129. func schemeFromTLSInfo(tls *transport.TLSInfo) string {
  130. if tls == nil {
  131. return UrlScheme
  132. }
  133. return UrlSchemeTLS
  134. }
  135. func (c *cluster) fillClusterForMembers() error {
  136. if c.cfg.DiscoveryURL != "" {
  137. // cluster will be discovered
  138. return nil
  139. }
  140. addrs := make([]string, 0)
  141. for _, m := range c.Members {
  142. scheme := schemeFromTLSInfo(m.PeerTLSInfo)
  143. for _, l := range m.PeerListeners {
  144. addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String()))
  145. }
  146. }
  147. clusterStr := strings.Join(addrs, ",")
  148. var err error
  149. for _, m := range c.Members {
  150. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  151. if err != nil {
  152. return err
  153. }
  154. }
  155. return nil
  156. }
  157. func newCluster(t *testing.T, cfg *ClusterConfig) *cluster {
  158. c := &cluster{cfg: cfg}
  159. ms := make([]*member, cfg.Size)
  160. for i := 0; i < cfg.Size; i++ {
  161. ms[i] = c.mustNewMember(t)
  162. }
  163. c.Members = ms
  164. if err := c.fillClusterForMembers(); err != nil {
  165. t.Fatal(err)
  166. }
  167. return c
  168. }
  169. // NewCluster returns an unlaunched cluster of the given size which has been
  170. // set to use static bootstrap.
  171. func NewCluster(t *testing.T, size int) *cluster {
  172. return newCluster(t, &ClusterConfig{Size: size})
  173. }
  174. // NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration
  175. func NewClusterByConfig(t *testing.T, cfg *ClusterConfig) *cluster {
  176. return newCluster(t, cfg)
  177. }
  178. func (c *cluster) Launch(t *testing.T) {
  179. errc := make(chan error)
  180. for _, m := range c.Members {
  181. // Members are launched in separate goroutines because if they boot
  182. // using discovery url, they have to wait for others to register to continue.
  183. go func(m *member) {
  184. errc <- m.Launch()
  185. }(m)
  186. }
  187. for range c.Members {
  188. if err := <-errc; err != nil {
  189. t.Fatalf("error setting up member: %v", err)
  190. }
  191. }
  192. // wait cluster to be stable to receive future client requests
  193. c.waitMembersMatch(t, c.HTTPMembers())
  194. c.waitVersion()
  195. }
  196. func (c *cluster) URL(i int) string {
  197. return c.Members[i].ClientURLs[0].String()
  198. }
  199. // URLs returns a list of all active client URLs in the cluster
  200. func (c *cluster) URLs() []string {
  201. return getMembersURLs(c.Members)
  202. }
  203. func getMembersURLs(members []*member) []string {
  204. urls := make([]string, 0)
  205. for _, m := range members {
  206. select {
  207. case <-m.s.StopNotify():
  208. continue
  209. default:
  210. }
  211. for _, u := range m.ClientURLs {
  212. urls = append(urls, u.String())
  213. }
  214. }
  215. return urls
  216. }
  217. // HTTPMembers returns a list of all active members as client.Members
  218. func (c *cluster) HTTPMembers() []client.Member {
  219. ms := []client.Member{}
  220. for _, m := range c.Members {
  221. pScheme := schemeFromTLSInfo(m.PeerTLSInfo)
  222. cScheme := schemeFromTLSInfo(m.ClientTLSInfo)
  223. cm := client.Member{Name: m.Name}
  224. for _, ln := range m.PeerListeners {
  225. cm.PeerURLs = append(cm.PeerURLs, pScheme+"://"+ln.Addr().String())
  226. }
  227. for _, ln := range m.ClientListeners {
  228. cm.ClientURLs = append(cm.ClientURLs, cScheme+"://"+ln.Addr().String())
  229. }
  230. ms = append(ms, cm)
  231. }
  232. return ms
  233. }
  234. func (c *cluster) mustNewMember(t *testing.T) *member {
  235. m := mustNewMember(t,
  236. memberConfig{
  237. name: c.name(rand.Int()),
  238. peerTLS: c.cfg.PeerTLS,
  239. clientTLS: c.cfg.ClientTLS,
  240. quotaBackendBytes: c.cfg.QuotaBackendBytes,
  241. maxTxnOps: c.cfg.MaxTxnOps,
  242. maxRequestBytes: c.cfg.MaxRequestBytes,
  243. snapshotCount: c.cfg.SnapshotCount,
  244. snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries,
  245. grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime,
  246. grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
  247. grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout,
  248. clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
  249. clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
  250. useIP: c.cfg.UseIP,
  251. })
  252. m.DiscoveryURL = c.cfg.DiscoveryURL
  253. if c.cfg.UseGRPC {
  254. if err := m.listenGRPC(); err != nil {
  255. t.Fatal(err)
  256. }
  257. }
  258. return m
  259. }
  260. func (c *cluster) addMember(t *testing.T) {
  261. m := c.mustNewMember(t)
  262. scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
  263. // send add request to the cluster
  264. var err error
  265. for i := 0; i < len(c.Members); i++ {
  266. clientURL := c.URL(i)
  267. peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
  268. if err = c.addMemberByURL(t, clientURL, peerURL); err == nil {
  269. break
  270. }
  271. }
  272. if err != nil {
  273. t.Fatalf("add member failed on all members error: %v", err)
  274. }
  275. m.InitialPeerURLsMap = types.URLsMap{}
  276. for _, mm := range c.Members {
  277. m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
  278. }
  279. m.InitialPeerURLsMap[m.Name] = m.PeerURLs
  280. m.NewCluster = false
  281. if err := m.Launch(); err != nil {
  282. t.Fatal(err)
  283. }
  284. c.Members = append(c.Members, m)
  285. // wait cluster to be stable to receive future client requests
  286. c.waitMembersMatch(t, c.HTTPMembers())
  287. }
  288. func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error {
  289. cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
  290. ma := client.NewMembersAPI(cc)
  291. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  292. _, err := ma.Add(ctx, peerURL)
  293. cancel()
  294. if err != nil {
  295. return err
  296. }
  297. // wait for the add node entry applied in the cluster
  298. members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
  299. c.waitMembersMatch(t, members)
  300. return nil
  301. }
  302. func (c *cluster) AddMember(t *testing.T) {
  303. c.addMember(t)
  304. }
  305. func (c *cluster) RemoveMember(t *testing.T, id uint64) {
  306. if err := c.removeMember(t, id); err != nil {
  307. t.Fatal(err)
  308. }
  309. }
  310. func (c *cluster) removeMember(t *testing.T, id uint64) error {
  311. // send remove request to the cluster
  312. cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
  313. ma := client.NewMembersAPI(cc)
  314. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  315. err := ma.Remove(ctx, types.ID(id).String())
  316. cancel()
  317. if err != nil {
  318. return err
  319. }
  320. newMembers := make([]*member, 0)
  321. for _, m := range c.Members {
  322. if uint64(m.s.ID()) != id {
  323. newMembers = append(newMembers, m)
  324. } else {
  325. select {
  326. case <-m.s.StopNotify():
  327. m.Terminate(t)
  328. // 1s stop delay + election timeout + 1s disk and network delay + connection write timeout
  329. // TODO: remove connection write timeout by selecting on http response closeNotifier
  330. // blocking on https://github.com/golang/go/issues/9524
  331. case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout):
  332. t.Fatalf("failed to remove member %s in time", m.s.ID())
  333. }
  334. }
  335. }
  336. c.Members = newMembers
  337. c.waitMembersMatch(t, c.HTTPMembers())
  338. return nil
  339. }
  340. func (c *cluster) Terminate(t *testing.T) {
  341. var wg sync.WaitGroup
  342. wg.Add(len(c.Members))
  343. for _, m := range c.Members {
  344. go func(mm *member) {
  345. defer wg.Done()
  346. mm.Terminate(t)
  347. }(m)
  348. }
  349. wg.Wait()
  350. }
  351. func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
  352. for _, u := range c.URLs() {
  353. cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
  354. ma := client.NewMembersAPI(cc)
  355. for {
  356. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  357. ms, err := ma.List(ctx)
  358. cancel()
  359. if err == nil && isMembersEqual(ms, membs) {
  360. break
  361. }
  362. time.Sleep(tickDuration)
  363. }
  364. }
  365. }
  366. func (c *cluster) WaitLeader(t *testing.T) int { return c.waitLeader(t, c.Members) }
  367. // waitLeader waits until given members agree on the same leader.
  368. func (c *cluster) waitLeader(t *testing.T, membs []*member) int {
  369. possibleLead := make(map[uint64]bool)
  370. var lead uint64
  371. for _, m := range membs {
  372. possibleLead[uint64(m.s.ID())] = true
  373. }
  374. cc := MustNewHTTPClient(t, getMembersURLs(membs), nil)
  375. kapi := client.NewKeysAPI(cc)
  376. // ensure leader is up via linearizable get
  377. for {
  378. ctx, cancel := context.WithTimeout(context.Background(), 10*tickDuration+time.Second)
  379. _, err := kapi.Get(ctx, "0", &client.GetOptions{Quorum: true})
  380. cancel()
  381. if err == nil || strings.Contains(err.Error(), "Key not found") {
  382. break
  383. }
  384. }
  385. for lead == 0 || !possibleLead[lead] {
  386. lead = 0
  387. for _, m := range membs {
  388. select {
  389. case <-m.s.StopNotify():
  390. continue
  391. default:
  392. }
  393. if lead != 0 && lead != m.s.Lead() {
  394. lead = 0
  395. time.Sleep(10 * tickDuration)
  396. break
  397. }
  398. lead = m.s.Lead()
  399. }
  400. }
  401. for i, m := range membs {
  402. if uint64(m.s.ID()) == lead {
  403. return i
  404. }
  405. }
  406. return -1
  407. }
  408. func (c *cluster) WaitNoLeader() { c.waitNoLeader(c.Members) }
  409. // waitNoLeader waits until given members lose leader.
  410. func (c *cluster) waitNoLeader(membs []*member) {
  411. noLeader := false
  412. for !noLeader {
  413. noLeader = true
  414. for _, m := range membs {
  415. select {
  416. case <-m.s.StopNotify():
  417. continue
  418. default:
  419. }
  420. if m.s.Lead() != 0 {
  421. noLeader = false
  422. time.Sleep(10 * tickDuration)
  423. break
  424. }
  425. }
  426. }
  427. }
  428. func (c *cluster) waitVersion() {
  429. for _, m := range c.Members {
  430. for {
  431. if m.s.ClusterVersion() != nil {
  432. break
  433. }
  434. time.Sleep(tickDuration)
  435. }
  436. }
  437. }
  438. func (c *cluster) name(i int) string {
  439. return fmt.Sprint(i)
  440. }
  441. // isMembersEqual checks whether two members equal except ID field.
  442. // The given wmembs should always set ID field to empty string.
  443. func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
  444. sort.Sort(SortableMemberSliceByPeerURLs(membs))
  445. sort.Sort(SortableMemberSliceByPeerURLs(wmembs))
  446. for i := range membs {
  447. membs[i].ID = ""
  448. }
  449. return reflect.DeepEqual(membs, wmembs)
  450. }
  451. func newLocalListener(t *testing.T) net.Listener {
  452. c := atomic.AddInt64(&localListenCount, 1)
  453. // Go 1.8+ allows only numbers in port
  454. addr := fmt.Sprintf("127.0.0.1:%05d%05d", c+basePort, os.Getpid())
  455. return NewListenerWithAddr(t, addr)
  456. }
  457. func NewListenerWithAddr(t *testing.T, addr string) net.Listener {
  458. l, err := transport.NewUnixListener(addr)
  459. if err != nil {
  460. t.Fatal(err)
  461. }
  462. return l
  463. }
  464. type member struct {
  465. etcdserver.ServerConfig
  466. PeerListeners, ClientListeners []net.Listener
  467. grpcListener net.Listener
  468. // PeerTLSInfo enables peer TLS when set
  469. PeerTLSInfo *transport.TLSInfo
  470. // ClientTLSInfo enables client TLS when set
  471. ClientTLSInfo *transport.TLSInfo
  472. raftHandler *testutil.PauseableHandler
  473. s *etcdserver.EtcdServer
  474. serverClosers []func()
  475. grpcServerOpts []grpc.ServerOption
  476. grpcServer *grpc.Server
  477. grpcServerPeer *grpc.Server
  478. grpcAddr string
  479. grpcBridge *bridge
  480. // serverClient is a clientv3 that directly calls the etcdserver.
  481. serverClient *clientv3.Client
  482. keepDataDirTerminate bool
  483. clientMaxCallSendMsgSize int
  484. clientMaxCallRecvMsgSize int
  485. useIP bool
  486. }
  487. func (m *member) GRPCAddr() string { return m.grpcAddr }
  488. type memberConfig struct {
  489. name string
  490. peerTLS *transport.TLSInfo
  491. clientTLS *transport.TLSInfo
  492. quotaBackendBytes int64
  493. maxTxnOps uint
  494. maxRequestBytes uint
  495. snapshotCount uint64
  496. snapshotCatchUpEntries uint64
  497. grpcKeepAliveMinTime time.Duration
  498. grpcKeepAliveInterval time.Duration
  499. grpcKeepAliveTimeout time.Duration
  500. clientMaxCallSendMsgSize int
  501. clientMaxCallRecvMsgSize int
  502. useIP bool
  503. }
  504. // mustNewMember return an inited member with the given name. If peerTLS is
  505. // set, it will use https scheme to communicate between peers.
  506. func mustNewMember(t *testing.T, mcfg memberConfig) *member {
  507. var err error
  508. m := &member{}
  509. peerScheme := schemeFromTLSInfo(mcfg.peerTLS)
  510. clientScheme := schemeFromTLSInfo(mcfg.clientTLS)
  511. pln := newLocalListener(t)
  512. m.PeerListeners = []net.Listener{pln}
  513. m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()})
  514. if err != nil {
  515. t.Fatal(err)
  516. }
  517. m.PeerTLSInfo = mcfg.peerTLS
  518. cln := newLocalListener(t)
  519. m.ClientListeners = []net.Listener{cln}
  520. m.ClientURLs, err = types.NewURLs([]string{clientScheme + "://" + cln.Addr().String()})
  521. if err != nil {
  522. t.Fatal(err)
  523. }
  524. m.ClientTLSInfo = mcfg.clientTLS
  525. m.Name = mcfg.name
  526. m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
  527. if err != nil {
  528. t.Fatal(err)
  529. }
  530. clusterStr := fmt.Sprintf("%s=%s://%s", mcfg.name, peerScheme, pln.Addr().String())
  531. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  532. if err != nil {
  533. t.Fatal(err)
  534. }
  535. m.InitialClusterToken = clusterName
  536. m.NewCluster = true
  537. m.BootstrapTimeout = 10 * time.Millisecond
  538. if m.PeerTLSInfo != nil {
  539. m.ServerConfig.PeerTLSInfo = *m.PeerTLSInfo
  540. }
  541. m.ElectionTicks = electionTicks
  542. m.InitialElectionTickAdvance = true
  543. m.TickMs = uint(tickDuration / time.Millisecond)
  544. m.QuotaBackendBytes = mcfg.quotaBackendBytes
  545. m.MaxTxnOps = mcfg.maxTxnOps
  546. if m.MaxTxnOps == 0 {
  547. m.MaxTxnOps = embed.DefaultMaxTxnOps
  548. }
  549. m.MaxRequestBytes = mcfg.maxRequestBytes
  550. if m.MaxRequestBytes == 0 {
  551. m.MaxRequestBytes = embed.DefaultMaxRequestBytes
  552. }
  553. m.SnapshotCount = etcdserver.DefaultSnapshotCount
  554. if mcfg.snapshotCount != 0 {
  555. m.SnapshotCount = mcfg.snapshotCount
  556. }
  557. m.SnapshotCatchUpEntries = etcdserver.DefaultSnapshotCatchUpEntries
  558. if mcfg.snapshotCatchUpEntries != 0 {
  559. m.SnapshotCatchUpEntries = mcfg.snapshotCatchUpEntries
  560. }
  561. m.AuthToken = "simple" // for the purpose of integration testing, simple token is enough
  562. m.BcryptCost = uint(bcrypt.MinCost) // use min bcrypt cost to speedy up integration testing
  563. m.grpcServerOpts = []grpc.ServerOption{}
  564. if mcfg.grpcKeepAliveMinTime > time.Duration(0) {
  565. m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
  566. MinTime: mcfg.grpcKeepAliveMinTime,
  567. PermitWithoutStream: false,
  568. }))
  569. }
  570. if mcfg.grpcKeepAliveInterval > time.Duration(0) &&
  571. mcfg.grpcKeepAliveTimeout > time.Duration(0) {
  572. m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveParams(keepalive.ServerParameters{
  573. Time: mcfg.grpcKeepAliveInterval,
  574. Timeout: mcfg.grpcKeepAliveTimeout,
  575. }))
  576. }
  577. m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize
  578. m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize
  579. m.useIP = mcfg.useIP
  580. m.InitialCorruptCheck = true
  581. m.LoggerConfig = &zap.Config{
  582. Level: zap.NewAtomicLevelAt(zap.InfoLevel),
  583. Development: false,
  584. Sampling: &zap.SamplingConfig{
  585. Initial: 100,
  586. Thereafter: 100,
  587. },
  588. Encoding: "json",
  589. EncoderConfig: zap.NewProductionEncoderConfig(),
  590. OutputPaths: []string{"/dev/null"},
  591. ErrorOutputPaths: []string{"/dev/null"},
  592. }
  593. if os.Getenv("CLUSTER_DEBUG") != "" {
  594. m.LoggerConfig.OutputPaths = []string{"stderr"}
  595. m.LoggerConfig.ErrorOutputPaths = []string{"stderr"}
  596. }
  597. m.Logger, err = m.LoggerConfig.Build()
  598. if err != nil {
  599. t.Fatal(err)
  600. }
  601. return m
  602. }
  603. // listenGRPC starts a grpc server over a unix domain socket on the member
  604. func (m *member) listenGRPC() error {
  605. // prefix with localhost so cert has right domain
  606. m.grpcAddr = "localhost:" + m.Name
  607. if m.useIP { // for IP-only TLS certs
  608. m.grpcAddr = "127.0.0.1:" + m.Name
  609. }
  610. l, err := transport.NewUnixListener(m.grpcAddr)
  611. if err != nil {
  612. return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
  613. }
  614. m.grpcBridge, err = newBridge(m.grpcAddr)
  615. if err != nil {
  616. l.Close()
  617. return err
  618. }
  619. m.grpcAddr = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + m.grpcBridge.inaddr
  620. m.grpcListener = l
  621. return nil
  622. }
  623. func (m *member) ElectionTimeout() time.Duration {
  624. return time.Duration(m.s.Cfg.ElectionTicks*int(m.s.Cfg.TickMs)) * time.Millisecond
  625. }
  626. func (m *member) ID() types.ID { return m.s.ID() }
  627. func (m *member) DropConnections() { m.grpcBridge.Reset() }
  628. func (m *member) PauseConnections() { m.grpcBridge.Pause() }
  629. func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
  630. func (m *member) Blackhole() { m.grpcBridge.Blackhole() }
  631. func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() }
  632. // NewClientV3 creates a new grpc client connection to the member
  633. func NewClientV3(m *member) (*clientv3.Client, error) {
  634. if m.grpcAddr == "" {
  635. return nil, fmt.Errorf("member not configured for grpc")
  636. }
  637. cfg := clientv3.Config{
  638. Endpoints: []string{m.grpcAddr},
  639. DialTimeout: 5 * time.Second,
  640. MaxCallSendMsgSize: m.clientMaxCallSendMsgSize,
  641. MaxCallRecvMsgSize: m.clientMaxCallRecvMsgSize,
  642. }
  643. if m.ClientTLSInfo != nil {
  644. tls, err := m.ClientTLSInfo.ClientConfig()
  645. if err != nil {
  646. return nil, err
  647. }
  648. cfg.TLS = tls
  649. }
  650. return newClientV3(cfg)
  651. }
  652. // Clone returns a member with the same server configuration. The returned
  653. // member will not set PeerListeners and ClientListeners.
  654. func (m *member) Clone(t *testing.T) *member {
  655. mm := &member{}
  656. mm.ServerConfig = m.ServerConfig
  657. var err error
  658. clientURLStrs := m.ClientURLs.StringSlice()
  659. mm.ClientURLs, err = types.NewURLs(clientURLStrs)
  660. if err != nil {
  661. // this should never fail
  662. panic(err)
  663. }
  664. peerURLStrs := m.PeerURLs.StringSlice()
  665. mm.PeerURLs, err = types.NewURLs(peerURLStrs)
  666. if err != nil {
  667. // this should never fail
  668. panic(err)
  669. }
  670. clusterStr := m.InitialPeerURLsMap.String()
  671. mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  672. if err != nil {
  673. // this should never fail
  674. panic(err)
  675. }
  676. mm.InitialClusterToken = m.InitialClusterToken
  677. mm.ElectionTicks = m.ElectionTicks
  678. mm.PeerTLSInfo = m.PeerTLSInfo
  679. mm.ClientTLSInfo = m.ClientTLSInfo
  680. return mm
  681. }
  682. // Launch starts a member based on ServerConfig, PeerListeners
  683. // and ClientListeners.
  684. func (m *member) Launch() error {
  685. lg.Info(
  686. "launching a member",
  687. zap.String("name", m.Name),
  688. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  689. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  690. zap.String("grpc-address", m.grpcAddr),
  691. )
  692. var err error
  693. if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
  694. return fmt.Errorf("failed to initialize the etcd server: %v", err)
  695. }
  696. m.s.SyncTicker = time.NewTicker(500 * time.Millisecond)
  697. m.s.Start()
  698. var peerTLScfg *tls.Config
  699. if m.PeerTLSInfo != nil && !m.PeerTLSInfo.Empty() {
  700. if peerTLScfg, err = m.PeerTLSInfo.ServerConfig(); err != nil {
  701. return err
  702. }
  703. }
  704. if m.grpcListener != nil {
  705. var (
  706. tlscfg *tls.Config
  707. )
  708. if m.ClientTLSInfo != nil && !m.ClientTLSInfo.Empty() {
  709. tlscfg, err = m.ClientTLSInfo.ServerConfig()
  710. if err != nil {
  711. return err
  712. }
  713. }
  714. m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...)
  715. m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg)
  716. m.serverClient = v3client.New(m.s)
  717. lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient))
  718. epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient))
  719. go m.grpcServer.Serve(m.grpcListener)
  720. }
  721. m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.Logger, m.s)}
  722. h := (http.Handler)(m.raftHandler)
  723. if m.grpcListener != nil {
  724. h = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  725. if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
  726. m.grpcServerPeer.ServeHTTP(w, r)
  727. } else {
  728. m.raftHandler.ServeHTTP(w, r)
  729. }
  730. })
  731. }
  732. for _, ln := range m.PeerListeners {
  733. cm := cmux.New(ln)
  734. // don't hang on matcher after closing listener
  735. cm.SetReadTimeout(time.Second)
  736. if m.grpcServer != nil {
  737. grpcl := cm.Match(cmux.HTTP2())
  738. go m.grpcServerPeer.Serve(grpcl)
  739. }
  740. // serve http1/http2 rafthttp/grpc
  741. ll := cm.Match(cmux.Any())
  742. if peerTLScfg != nil {
  743. if ll, err = transport.NewTLSListener(ll, m.PeerTLSInfo); err != nil {
  744. return err
  745. }
  746. }
  747. hs := &httptest.Server{
  748. Listener: ll,
  749. Config: &http.Server{
  750. Handler: h,
  751. TLSConfig: peerTLScfg,
  752. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  753. },
  754. TLS: peerTLScfg,
  755. }
  756. hs.Start()
  757. donec := make(chan struct{})
  758. go func() {
  759. defer close(donec)
  760. cm.Serve()
  761. }()
  762. closer := func() {
  763. ll.Close()
  764. hs.CloseClientConnections()
  765. hs.Close()
  766. <-donec
  767. }
  768. m.serverClosers = append(m.serverClosers, closer)
  769. }
  770. for _, ln := range m.ClientListeners {
  771. hs := &httptest.Server{
  772. Listener: ln,
  773. Config: &http.Server{
  774. Handler: v2http.NewClientHandler(
  775. m.Logger,
  776. m.s,
  777. m.ServerConfig.ReqTimeout(),
  778. ),
  779. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  780. },
  781. }
  782. if m.ClientTLSInfo == nil {
  783. hs.Start()
  784. } else {
  785. info := m.ClientTLSInfo
  786. hs.TLS, err = info.ServerConfig()
  787. if err != nil {
  788. return err
  789. }
  790. // baseConfig is called on initial TLS handshake start.
  791. //
  792. // Previously,
  793. // 1. Server has non-empty (*tls.Config).Certificates on client hello
  794. // 2. Server calls (*tls.Config).GetCertificate iff:
  795. // - Server's (*tls.Config).Certificates is not empty, or
  796. // - Client supplies SNI; non-empty (*tls.ClientHelloInfo).ServerName
  797. //
  798. // When (*tls.Config).Certificates is always populated on initial handshake,
  799. // client is expected to provide a valid matching SNI to pass the TLS
  800. // verification, thus trigger server (*tls.Config).GetCertificate to reload
  801. // TLS assets. However, a cert whose SAN field does not include domain names
  802. // but only IP addresses, has empty (*tls.ClientHelloInfo).ServerName, thus
  803. // it was never able to trigger TLS reload on initial handshake; first
  804. // ceritifcate object was being used, never being updated.
  805. //
  806. // Now, (*tls.Config).Certificates is created empty on initial TLS client
  807. // handshake, in order to trigger (*tls.Config).GetCertificate and populate
  808. // rest of the certificates on every new TLS connection, even when client
  809. // SNI is empty (e.g. cert only includes IPs).
  810. //
  811. // This introduces another problem with "httptest.Server":
  812. // when server initial certificates are empty, certificates
  813. // are overwritten by Go's internal test certs, which have
  814. // different SAN fields (e.g. example.com). To work around,
  815. // re-overwrite (*tls.Config).Certificates before starting
  816. // test server.
  817. tlsCert, err := tlsutil.NewCert(info.CertFile, info.KeyFile, nil)
  818. if err != nil {
  819. return err
  820. }
  821. hs.TLS.Certificates = []tls.Certificate{*tlsCert}
  822. hs.StartTLS()
  823. }
  824. closer := func() {
  825. ln.Close()
  826. hs.CloseClientConnections()
  827. hs.Close()
  828. }
  829. m.serverClosers = append(m.serverClosers, closer)
  830. }
  831. lg.Info(
  832. "launched a member",
  833. zap.String("name", m.Name),
  834. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  835. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  836. zap.String("grpc-address", m.grpcAddr),
  837. )
  838. return nil
  839. }
  840. func (m *member) WaitOK(t *testing.T) {
  841. cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
  842. kapi := client.NewKeysAPI(cc)
  843. for {
  844. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  845. _, err := kapi.Get(ctx, "/", nil)
  846. if err != nil {
  847. time.Sleep(tickDuration)
  848. continue
  849. }
  850. cancel()
  851. break
  852. }
  853. for m.s.Leader() == 0 {
  854. time.Sleep(tickDuration)
  855. }
  856. }
  857. func (m *member) URL() string { return m.ClientURLs[0].String() }
  858. func (m *member) Pause() {
  859. m.raftHandler.Pause()
  860. m.s.PauseSending()
  861. }
  862. func (m *member) Resume() {
  863. m.raftHandler.Resume()
  864. m.s.ResumeSending()
  865. }
  866. // Close stops the member's etcdserver and closes its connections
  867. func (m *member) Close() {
  868. if m.grpcBridge != nil {
  869. m.grpcBridge.Close()
  870. m.grpcBridge = nil
  871. }
  872. if m.serverClient != nil {
  873. m.serverClient.Close()
  874. m.serverClient = nil
  875. }
  876. if m.grpcServer != nil {
  877. m.grpcServer.Stop()
  878. m.grpcServer.GracefulStop()
  879. m.grpcServer = nil
  880. m.grpcServerPeer.Stop()
  881. m.grpcServerPeer.GracefulStop()
  882. m.grpcServerPeer = nil
  883. }
  884. m.s.HardStop()
  885. for _, f := range m.serverClosers {
  886. f()
  887. }
  888. }
  889. // Stop stops the member, but the data dir of the member is preserved.
  890. func (m *member) Stop(t *testing.T) {
  891. lg.Info(
  892. "stopping a member",
  893. zap.String("name", m.Name),
  894. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  895. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  896. zap.String("grpc-address", m.grpcAddr),
  897. )
  898. m.Close()
  899. m.serverClosers = nil
  900. lg.Info(
  901. "stopped a member",
  902. zap.String("name", m.Name),
  903. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  904. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  905. zap.String("grpc-address", m.grpcAddr),
  906. )
  907. }
  908. // checkLeaderTransition waits for leader transition, returning the new leader ID.
  909. func checkLeaderTransition(m *member, oldLead uint64) uint64 {
  910. interval := time.Duration(m.s.Cfg.TickMs) * time.Millisecond
  911. for m.s.Lead() == 0 || (m.s.Lead() == oldLead) {
  912. time.Sleep(interval)
  913. }
  914. return m.s.Lead()
  915. }
  916. // StopNotify unblocks when a member stop completes
  917. func (m *member) StopNotify() <-chan struct{} {
  918. return m.s.StopNotify()
  919. }
  920. // Restart starts the member using the preserved data dir.
  921. func (m *member) Restart(t *testing.T) error {
  922. lg.Info(
  923. "restarting a member",
  924. zap.String("name", m.Name),
  925. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  926. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  927. zap.String("grpc-address", m.grpcAddr),
  928. )
  929. newPeerListeners := make([]net.Listener, 0)
  930. for _, ln := range m.PeerListeners {
  931. newPeerListeners = append(newPeerListeners, NewListenerWithAddr(t, ln.Addr().String()))
  932. }
  933. m.PeerListeners = newPeerListeners
  934. newClientListeners := make([]net.Listener, 0)
  935. for _, ln := range m.ClientListeners {
  936. newClientListeners = append(newClientListeners, NewListenerWithAddr(t, ln.Addr().String()))
  937. }
  938. m.ClientListeners = newClientListeners
  939. if m.grpcListener != nil {
  940. if err := m.listenGRPC(); err != nil {
  941. t.Fatal(err)
  942. }
  943. }
  944. err := m.Launch()
  945. lg.Info(
  946. "restarted a member",
  947. zap.String("name", m.Name),
  948. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  949. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  950. zap.String("grpc-address", m.grpcAddr),
  951. zap.Error(err),
  952. )
  953. return err
  954. }
  955. // Terminate stops the member and removes the data dir.
  956. func (m *member) Terminate(t *testing.T) {
  957. lg.Info(
  958. "terminating a member",
  959. zap.String("name", m.Name),
  960. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  961. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  962. zap.String("grpc-address", m.grpcAddr),
  963. )
  964. m.Close()
  965. if !m.keepDataDirTerminate {
  966. if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
  967. t.Fatal(err)
  968. }
  969. }
  970. lg.Info(
  971. "terminated a member",
  972. zap.String("name", m.Name),
  973. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  974. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  975. zap.String("grpc-address", m.grpcAddr),
  976. )
  977. }
  978. // Metric gets the metric value for a member
  979. func (m *member) Metric(metricName string) (string, error) {
  980. cfgtls := transport.TLSInfo{}
  981. tr, err := transport.NewTimeoutTransport(cfgtls, time.Second, time.Second, time.Second)
  982. if err != nil {
  983. return "", err
  984. }
  985. cli := &http.Client{Transport: tr}
  986. resp, err := cli.Get(m.ClientURLs[0].String() + "/metrics")
  987. if err != nil {
  988. return "", err
  989. }
  990. defer resp.Body.Close()
  991. b, rerr := ioutil.ReadAll(resp.Body)
  992. if rerr != nil {
  993. return "", rerr
  994. }
  995. lines := strings.Split(string(b), "\n")
  996. for _, l := range lines {
  997. if strings.HasPrefix(l, metricName) {
  998. return strings.Split(l, " ")[1], nil
  999. }
  1000. }
  1001. return "", nil
  1002. }
  1003. // InjectPartition drops connections from m to others, vice versa.
  1004. func (m *member) InjectPartition(t *testing.T, others ...*member) {
  1005. for _, other := range others {
  1006. m.s.CutPeer(other.s.ID())
  1007. other.s.CutPeer(m.s.ID())
  1008. }
  1009. }
  1010. // RecoverPartition recovers connections from m to others, vice versa.
  1011. func (m *member) RecoverPartition(t *testing.T, others ...*member) {
  1012. for _, other := range others {
  1013. m.s.MendPeer(other.s.ID())
  1014. other.s.MendPeer(m.s.ID())
  1015. }
  1016. }
  1017. func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client {
  1018. cfgtls := transport.TLSInfo{}
  1019. if tls != nil {
  1020. cfgtls = *tls
  1021. }
  1022. cfg := client.Config{Transport: mustNewTransport(t, cfgtls), Endpoints: eps}
  1023. c, err := client.New(cfg)
  1024. if err != nil {
  1025. t.Fatal(err)
  1026. }
  1027. return c
  1028. }
  1029. func mustNewTransport(t *testing.T, tlsInfo transport.TLSInfo) *http.Transport {
  1030. // tick in integration test is short, so 1s dial timeout could play well.
  1031. tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
  1032. if err != nil {
  1033. t.Fatal(err)
  1034. }
  1035. return tr
  1036. }
  1037. type SortableMemberSliceByPeerURLs []client.Member
  1038. func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
  1039. func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {
  1040. return p[i].PeerURLs[0] < p[j].PeerURLs[0]
  1041. }
  1042. func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  1043. type ClusterV3 struct {
  1044. *cluster
  1045. mu sync.Mutex
  1046. clients []*clientv3.Client
  1047. }
  1048. // NewClusterV3 returns a launched cluster with a grpc client connection
  1049. // for each cluster member.
  1050. func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
  1051. cfg.UseGRPC = true
  1052. if os.Getenv("CLIENT_DEBUG") != "" {
  1053. clientv3.SetLogger(grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 4))
  1054. }
  1055. clus := &ClusterV3{
  1056. cluster: NewClusterByConfig(t, cfg),
  1057. }
  1058. clus.Launch(t)
  1059. if !cfg.SkipCreatingClient {
  1060. for _, m := range clus.Members {
  1061. client, err := NewClientV3(m)
  1062. if err != nil {
  1063. t.Fatalf("cannot create client: %v", err)
  1064. }
  1065. clus.clients = append(clus.clients, client)
  1066. }
  1067. }
  1068. return clus
  1069. }
  1070. func (c *ClusterV3) TakeClient(idx int) {
  1071. c.mu.Lock()
  1072. c.clients[idx] = nil
  1073. c.mu.Unlock()
  1074. }
  1075. func (c *ClusterV3) Terminate(t *testing.T) {
  1076. c.mu.Lock()
  1077. for _, client := range c.clients {
  1078. if client == nil {
  1079. continue
  1080. }
  1081. if err := client.Close(); err != nil {
  1082. t.Error(err)
  1083. }
  1084. }
  1085. c.mu.Unlock()
  1086. c.cluster.Terminate(t)
  1087. }
  1088. func (c *ClusterV3) RandClient() *clientv3.Client {
  1089. return c.clients[rand.Intn(len(c.clients))]
  1090. }
  1091. func (c *ClusterV3) Client(i int) *clientv3.Client {
  1092. return c.clients[i]
  1093. }
  1094. type grpcAPI struct {
  1095. // Cluster is the cluster API for the client's connection.
  1096. Cluster pb.ClusterClient
  1097. // KV is the keyvalue API for the client's connection.
  1098. KV pb.KVClient
  1099. // Lease is the lease API for the client's connection.
  1100. Lease pb.LeaseClient
  1101. // Watch is the watch API for the client's connection.
  1102. Watch pb.WatchClient
  1103. // Maintenance is the maintenance API for the client's connection.
  1104. Maintenance pb.MaintenanceClient
  1105. // Auth is the authentication API for the client's connection.
  1106. Auth pb.AuthClient
  1107. // Lock is the lock API for the client's connection.
  1108. Lock lockpb.LockClient
  1109. // Election is the election API for the client's connection.
  1110. Election epb.ElectionClient
  1111. }