cluster.go 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "fmt"
  19. "io/ioutil"
  20. "log"
  21. "math/rand"
  22. "net"
  23. "net/http"
  24. "net/http/httptest"
  25. "os"
  26. "reflect"
  27. "sort"
  28. "strings"
  29. "sync"
  30. "sync/atomic"
  31. "testing"
  32. "time"
  33. "go.etcd.io/etcd/client"
  34. "go.etcd.io/etcd/clientv3"
  35. "go.etcd.io/etcd/embed"
  36. "go.etcd.io/etcd/etcdserver"
  37. "go.etcd.io/etcd/etcdserver/api/etcdhttp"
  38. "go.etcd.io/etcd/etcdserver/api/rafthttp"
  39. "go.etcd.io/etcd/etcdserver/api/v2http"
  40. "go.etcd.io/etcd/etcdserver/api/v3client"
  41. "go.etcd.io/etcd/etcdserver/api/v3election"
  42. epb "go.etcd.io/etcd/etcdserver/api/v3election/v3electionpb"
  43. "go.etcd.io/etcd/etcdserver/api/v3lock"
  44. lockpb "go.etcd.io/etcd/etcdserver/api/v3lock/v3lockpb"
  45. "go.etcd.io/etcd/etcdserver/api/v3rpc"
  46. pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
  47. "go.etcd.io/etcd/pkg/testutil"
  48. "go.etcd.io/etcd/pkg/tlsutil"
  49. "go.etcd.io/etcd/pkg/transport"
  50. "go.etcd.io/etcd/pkg/types"
  51. "github.com/soheilhy/cmux"
  52. "go.uber.org/zap"
  53. "golang.org/x/crypto/bcrypt"
  54. "google.golang.org/grpc"
  55. "google.golang.org/grpc/grpclog"
  56. "google.golang.org/grpc/keepalive"
  57. )
  58. const (
  59. // RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss.
  60. RequestWaitTimeout = 3 * time.Second
  61. tickDuration = 10 * time.Millisecond
  62. requestTimeout = 20 * time.Second
  63. clusterName = "etcd"
  64. basePort = 21000
  65. URLScheme = "unix"
  66. URLSchemeTLS = "unixs"
  67. )
  68. var (
  69. electionTicks = 10
  70. // integration test uses unique ports, counting up, to listen for each
  71. // member, ensuring restarted members can listen on the same port again.
  72. localListenCount = int64(0)
  73. testTLSInfo = transport.TLSInfo{
  74. KeyFile: "./fixtures/server.key.insecure",
  75. CertFile: "./fixtures/server.crt",
  76. TrustedCAFile: "./fixtures/ca.crt",
  77. ClientCertAuth: true,
  78. }
  79. testTLSInfoIP = transport.TLSInfo{
  80. KeyFile: "./fixtures/server-ip.key.insecure",
  81. CertFile: "./fixtures/server-ip.crt",
  82. TrustedCAFile: "./fixtures/ca.crt",
  83. ClientCertAuth: true,
  84. }
  85. testTLSInfoExpired = transport.TLSInfo{
  86. KeyFile: "./fixtures-expired/server.key.insecure",
  87. CertFile: "./fixtures-expired/server.crt",
  88. TrustedCAFile: "./fixtures-expired/ca.crt",
  89. ClientCertAuth: true,
  90. }
  91. testTLSInfoExpiredIP = transport.TLSInfo{
  92. KeyFile: "./fixtures-expired/server-ip.key.insecure",
  93. CertFile: "./fixtures-expired/server-ip.crt",
  94. TrustedCAFile: "./fixtures-expired/ca.crt",
  95. ClientCertAuth: true,
  96. }
  97. defaultTokenJWT = "jwt,pub-key=./fixtures/server.crt,priv-key=./fixtures/server.key.insecure,sign-method=RS256,ttl=1s"
  98. lg = zap.NewNop()
  99. )
  100. func init() {
  101. if os.Getenv("CLUSTER_DEBUG") != "" {
  102. lg, _ = zap.NewProduction()
  103. }
  104. }
  105. type ClusterConfig struct {
  106. Size int
  107. PeerTLS *transport.TLSInfo
  108. ClientTLS *transport.TLSInfo
  109. DiscoveryURL string
  110. AuthToken string
  111. UseGRPC bool
  112. QuotaBackendBytes int64
  113. MaxTxnOps uint
  114. MaxRequestBytes uint
  115. SnapshotCount uint64
  116. SnapshotCatchUpEntries uint64
  117. GRPCKeepAliveMinTime time.Duration
  118. GRPCKeepAliveInterval time.Duration
  119. GRPCKeepAliveTimeout time.Duration
  120. // SkipCreatingClient to skip creating clients for each member.
  121. SkipCreatingClient bool
  122. ClientMaxCallSendMsgSize int
  123. ClientMaxCallRecvMsgSize int
  124. // UseIP is true to use only IP for gRPC requests.
  125. UseIP bool
  126. LeaseCheckpointInterval time.Duration
  127. }
  128. type cluster struct {
  129. cfg *ClusterConfig
  130. Members []*member
  131. }
  132. func schemeFromTLSInfo(tls *transport.TLSInfo) string {
  133. if tls == nil {
  134. return URLScheme
  135. }
  136. return URLSchemeTLS
  137. }
  138. func (c *cluster) fillClusterForMembers() error {
  139. if c.cfg.DiscoveryURL != "" {
  140. // cluster will be discovered
  141. return nil
  142. }
  143. addrs := make([]string, 0)
  144. for _, m := range c.Members {
  145. scheme := schemeFromTLSInfo(m.PeerTLSInfo)
  146. for _, l := range m.PeerListeners {
  147. addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String()))
  148. }
  149. }
  150. clusterStr := strings.Join(addrs, ",")
  151. var err error
  152. for _, m := range c.Members {
  153. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  154. if err != nil {
  155. return err
  156. }
  157. }
  158. return nil
  159. }
  160. func newCluster(t testing.TB, cfg *ClusterConfig) *cluster {
  161. c := &cluster{cfg: cfg}
  162. ms := make([]*member, cfg.Size)
  163. for i := 0; i < cfg.Size; i++ {
  164. ms[i] = c.mustNewMember(t)
  165. }
  166. c.Members = ms
  167. if err := c.fillClusterForMembers(); err != nil {
  168. t.Fatal(err)
  169. }
  170. return c
  171. }
  172. // NewCluster returns an unlaunched cluster of the given size which has been
  173. // set to use static bootstrap.
  174. func NewCluster(t testing.TB, size int) *cluster {
  175. return newCluster(t, &ClusterConfig{Size: size})
  176. }
  177. // NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration
  178. func NewClusterByConfig(t testing.TB, cfg *ClusterConfig) *cluster {
  179. return newCluster(t, cfg)
  180. }
  181. func (c *cluster) Launch(t testing.TB) {
  182. errc := make(chan error)
  183. for _, m := range c.Members {
  184. // Members are launched in separate goroutines because if they boot
  185. // using discovery url, they have to wait for others to register to continue.
  186. go func(m *member) {
  187. errc <- m.Launch()
  188. }(m)
  189. }
  190. for range c.Members {
  191. if err := <-errc; err != nil {
  192. t.Fatalf("error setting up member: %v", err)
  193. }
  194. }
  195. // wait cluster to be stable to receive future client requests
  196. c.waitMembersMatch(t, c.HTTPMembers())
  197. c.waitVersion()
  198. }
  199. func (c *cluster) URL(i int) string {
  200. return c.Members[i].ClientURLs[0].String()
  201. }
  202. // URLs returns a list of all active client URLs in the cluster
  203. func (c *cluster) URLs() []string {
  204. return getMembersURLs(c.Members)
  205. }
  206. func getMembersURLs(members []*member) []string {
  207. urls := make([]string, 0)
  208. for _, m := range members {
  209. select {
  210. case <-m.s.StopNotify():
  211. continue
  212. default:
  213. }
  214. for _, u := range m.ClientURLs {
  215. urls = append(urls, u.String())
  216. }
  217. }
  218. return urls
  219. }
  220. // HTTPMembers returns a list of all active members as client.Members
  221. func (c *cluster) HTTPMembers() []client.Member {
  222. ms := []client.Member{}
  223. for _, m := range c.Members {
  224. pScheme := schemeFromTLSInfo(m.PeerTLSInfo)
  225. cScheme := schemeFromTLSInfo(m.ClientTLSInfo)
  226. cm := client.Member{Name: m.Name}
  227. for _, ln := range m.PeerListeners {
  228. cm.PeerURLs = append(cm.PeerURLs, pScheme+"://"+ln.Addr().String())
  229. }
  230. for _, ln := range m.ClientListeners {
  231. cm.ClientURLs = append(cm.ClientURLs, cScheme+"://"+ln.Addr().String())
  232. }
  233. ms = append(ms, cm)
  234. }
  235. return ms
  236. }
  237. func (c *cluster) mustNewMember(t testing.TB) *member {
  238. m := mustNewMember(t,
  239. memberConfig{
  240. name: c.name(rand.Int()),
  241. authToken: c.cfg.AuthToken,
  242. peerTLS: c.cfg.PeerTLS,
  243. clientTLS: c.cfg.ClientTLS,
  244. quotaBackendBytes: c.cfg.QuotaBackendBytes,
  245. maxTxnOps: c.cfg.MaxTxnOps,
  246. maxRequestBytes: c.cfg.MaxRequestBytes,
  247. snapshotCount: c.cfg.SnapshotCount,
  248. snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries,
  249. grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime,
  250. grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
  251. grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout,
  252. clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
  253. clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
  254. useIP: c.cfg.UseIP,
  255. leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
  256. })
  257. m.DiscoveryURL = c.cfg.DiscoveryURL
  258. if c.cfg.UseGRPC {
  259. if err := m.listenGRPC(); err != nil {
  260. t.Fatal(err)
  261. }
  262. }
  263. return m
  264. }
  265. func (c *cluster) addMember(t testing.TB) {
  266. m := c.mustNewMember(t)
  267. scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
  268. // send add request to the cluster
  269. var err error
  270. for i := 0; i < len(c.Members); i++ {
  271. clientURL := c.URL(i)
  272. peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
  273. if err = c.addMemberByURL(t, clientURL, peerURL); err == nil {
  274. break
  275. }
  276. }
  277. if err != nil {
  278. t.Fatalf("add member failed on all members error: %v", err)
  279. }
  280. m.InitialPeerURLsMap = types.URLsMap{}
  281. for _, mm := range c.Members {
  282. m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
  283. }
  284. m.InitialPeerURLsMap[m.Name] = m.PeerURLs
  285. m.NewCluster = false
  286. if err := m.Launch(); err != nil {
  287. t.Fatal(err)
  288. }
  289. c.Members = append(c.Members, m)
  290. // wait cluster to be stable to receive future client requests
  291. c.waitMembersMatch(t, c.HTTPMembers())
  292. }
  293. func (c *cluster) addMemberByURL(t testing.TB, clientURL, peerURL string) error {
  294. cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
  295. ma := client.NewMembersAPI(cc)
  296. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  297. _, err := ma.Add(ctx, peerURL)
  298. cancel()
  299. if err != nil {
  300. return err
  301. }
  302. // wait for the add node entry applied in the cluster
  303. members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
  304. c.waitMembersMatch(t, members)
  305. return nil
  306. }
  307. func (c *cluster) AddMember(t testing.TB) {
  308. c.addMember(t)
  309. }
  310. func (c *cluster) RemoveMember(t testing.TB, id uint64) {
  311. if err := c.removeMember(t, id); err != nil {
  312. t.Fatal(err)
  313. }
  314. }
  315. func (c *cluster) removeMember(t testing.TB, id uint64) error {
  316. // send remove request to the cluster
  317. cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
  318. ma := client.NewMembersAPI(cc)
  319. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  320. err := ma.Remove(ctx, types.ID(id).String())
  321. cancel()
  322. if err != nil {
  323. return err
  324. }
  325. newMembers := make([]*member, 0)
  326. for _, m := range c.Members {
  327. if uint64(m.s.ID()) != id {
  328. newMembers = append(newMembers, m)
  329. } else {
  330. select {
  331. case <-m.s.StopNotify():
  332. m.Terminate(t)
  333. // 1s stop delay + election timeout + 1s disk and network delay + connection write timeout
  334. // TODO: remove connection write timeout by selecting on http response closeNotifier
  335. // blocking on https://github.com/golang/go/issues/9524
  336. case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout):
  337. t.Fatalf("failed to remove member %s in time", m.s.ID())
  338. }
  339. }
  340. }
  341. c.Members = newMembers
  342. c.waitMembersMatch(t, c.HTTPMembers())
  343. return nil
  344. }
  345. func (c *cluster) Terminate(t testing.TB) {
  346. var wg sync.WaitGroup
  347. wg.Add(len(c.Members))
  348. for _, m := range c.Members {
  349. go func(mm *member) {
  350. defer wg.Done()
  351. mm.Terminate(t)
  352. }(m)
  353. }
  354. wg.Wait()
  355. }
  356. func (c *cluster) waitMembersMatch(t testing.TB, membs []client.Member) {
  357. for _, u := range c.URLs() {
  358. cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
  359. ma := client.NewMembersAPI(cc)
  360. for {
  361. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  362. ms, err := ma.List(ctx)
  363. cancel()
  364. if err == nil && isMembersEqual(ms, membs) {
  365. break
  366. }
  367. time.Sleep(tickDuration)
  368. }
  369. }
  370. }
  371. func (c *cluster) WaitLeader(t testing.TB) int { return c.waitLeader(t, c.Members) }
  372. // waitLeader waits until given members agree on the same leader.
  373. func (c *cluster) waitLeader(t testing.TB, membs []*member) int {
  374. possibleLead := make(map[uint64]bool)
  375. var lead uint64
  376. for _, m := range membs {
  377. possibleLead[uint64(m.s.ID())] = true
  378. }
  379. cc := MustNewHTTPClient(t, getMembersURLs(membs), nil)
  380. kapi := client.NewKeysAPI(cc)
  381. // ensure leader is up via linearizable get
  382. for {
  383. ctx, cancel := context.WithTimeout(context.Background(), 10*tickDuration+time.Second)
  384. _, err := kapi.Get(ctx, "0", &client.GetOptions{Quorum: true})
  385. cancel()
  386. if err == nil || strings.Contains(err.Error(), "Key not found") {
  387. break
  388. }
  389. }
  390. for lead == 0 || !possibleLead[lead] {
  391. lead = 0
  392. for _, m := range membs {
  393. select {
  394. case <-m.s.StopNotify():
  395. continue
  396. default:
  397. }
  398. if lead != 0 && lead != m.s.Lead() {
  399. lead = 0
  400. time.Sleep(10 * tickDuration)
  401. break
  402. }
  403. lead = m.s.Lead()
  404. }
  405. }
  406. for i, m := range membs {
  407. if uint64(m.s.ID()) == lead {
  408. return i
  409. }
  410. }
  411. return -1
  412. }
  413. func (c *cluster) WaitNoLeader() { c.waitNoLeader(c.Members) }
  414. // waitNoLeader waits until given members lose leader.
  415. func (c *cluster) waitNoLeader(membs []*member) {
  416. noLeader := false
  417. for !noLeader {
  418. noLeader = true
  419. for _, m := range membs {
  420. select {
  421. case <-m.s.StopNotify():
  422. continue
  423. default:
  424. }
  425. if m.s.Lead() != 0 {
  426. noLeader = false
  427. time.Sleep(10 * tickDuration)
  428. break
  429. }
  430. }
  431. }
  432. }
  433. func (c *cluster) waitVersion() {
  434. for _, m := range c.Members {
  435. for {
  436. if m.s.ClusterVersion() != nil {
  437. break
  438. }
  439. time.Sleep(tickDuration)
  440. }
  441. }
  442. }
  443. func (c *cluster) name(i int) string {
  444. return fmt.Sprint(i)
  445. }
  446. // isMembersEqual checks whether two members equal except ID field.
  447. // The given wmembs should always set ID field to empty string.
  448. func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
  449. sort.Sort(SortableMemberSliceByPeerURLs(membs))
  450. sort.Sort(SortableMemberSliceByPeerURLs(wmembs))
  451. for i := range membs {
  452. membs[i].ID = ""
  453. }
  454. return reflect.DeepEqual(membs, wmembs)
  455. }
  456. func newLocalListener(t testing.TB) net.Listener {
  457. c := atomic.AddInt64(&localListenCount, 1)
  458. // Go 1.8+ allows only numbers in port
  459. addr := fmt.Sprintf("127.0.0.1:%05d%05d", c+basePort, os.Getpid())
  460. return NewListenerWithAddr(t, addr)
  461. }
  462. func NewListenerWithAddr(t testing.TB, addr string) net.Listener {
  463. l, err := transport.NewUnixListener(addr)
  464. if err != nil {
  465. t.Fatal(err)
  466. }
  467. return l
  468. }
  469. type member struct {
  470. etcdserver.ServerConfig
  471. PeerListeners, ClientListeners []net.Listener
  472. grpcListener net.Listener
  473. // PeerTLSInfo enables peer TLS when set
  474. PeerTLSInfo *transport.TLSInfo
  475. // ClientTLSInfo enables client TLS when set
  476. ClientTLSInfo *transport.TLSInfo
  477. DialOptions []grpc.DialOption
  478. raftHandler *testutil.PauseableHandler
  479. s *etcdserver.EtcdServer
  480. serverClosers []func()
  481. grpcServerOpts []grpc.ServerOption
  482. grpcServer *grpc.Server
  483. grpcServerPeer *grpc.Server
  484. grpcAddr string
  485. grpcBridge *bridge
  486. // serverClient is a clientv3 that directly calls the etcdserver.
  487. serverClient *clientv3.Client
  488. keepDataDirTerminate bool
  489. clientMaxCallSendMsgSize int
  490. clientMaxCallRecvMsgSize int
  491. useIP bool
  492. }
  493. func (m *member) GRPCAddr() string { return m.grpcAddr }
  494. type memberConfig struct {
  495. name string
  496. peerTLS *transport.TLSInfo
  497. clientTLS *transport.TLSInfo
  498. authToken string
  499. quotaBackendBytes int64
  500. maxTxnOps uint
  501. maxRequestBytes uint
  502. snapshotCount uint64
  503. snapshotCatchUpEntries uint64
  504. grpcKeepAliveMinTime time.Duration
  505. grpcKeepAliveInterval time.Duration
  506. grpcKeepAliveTimeout time.Duration
  507. clientMaxCallSendMsgSize int
  508. clientMaxCallRecvMsgSize int
  509. useIP bool
  510. leaseCheckpointInterval time.Duration
  511. }
  512. // mustNewMember return an inited member with the given name. If peerTLS is
  513. // set, it will use https scheme to communicate between peers.
  514. func mustNewMember(t testing.TB, mcfg memberConfig) *member {
  515. var err error
  516. m := &member{}
  517. peerScheme := schemeFromTLSInfo(mcfg.peerTLS)
  518. clientScheme := schemeFromTLSInfo(mcfg.clientTLS)
  519. pln := newLocalListener(t)
  520. m.PeerListeners = []net.Listener{pln}
  521. m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()})
  522. if err != nil {
  523. t.Fatal(err)
  524. }
  525. m.PeerTLSInfo = mcfg.peerTLS
  526. cln := newLocalListener(t)
  527. m.ClientListeners = []net.Listener{cln}
  528. m.ClientURLs, err = types.NewURLs([]string{clientScheme + "://" + cln.Addr().String()})
  529. if err != nil {
  530. t.Fatal(err)
  531. }
  532. m.ClientTLSInfo = mcfg.clientTLS
  533. m.Name = mcfg.name
  534. m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
  535. if err != nil {
  536. t.Fatal(err)
  537. }
  538. clusterStr := fmt.Sprintf("%s=%s://%s", mcfg.name, peerScheme, pln.Addr().String())
  539. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  540. if err != nil {
  541. t.Fatal(err)
  542. }
  543. m.InitialClusterToken = clusterName
  544. m.NewCluster = true
  545. m.BootstrapTimeout = 10 * time.Millisecond
  546. if m.PeerTLSInfo != nil {
  547. m.ServerConfig.PeerTLSInfo = *m.PeerTLSInfo
  548. }
  549. m.ElectionTicks = electionTicks
  550. m.InitialElectionTickAdvance = true
  551. m.TickMs = uint(tickDuration / time.Millisecond)
  552. m.QuotaBackendBytes = mcfg.quotaBackendBytes
  553. m.MaxTxnOps = mcfg.maxTxnOps
  554. if m.MaxTxnOps == 0 {
  555. m.MaxTxnOps = embed.DefaultMaxTxnOps
  556. }
  557. m.MaxRequestBytes = mcfg.maxRequestBytes
  558. if m.MaxRequestBytes == 0 {
  559. m.MaxRequestBytes = embed.DefaultMaxRequestBytes
  560. }
  561. m.SnapshotCount = etcdserver.DefaultSnapshotCount
  562. if mcfg.snapshotCount != 0 {
  563. m.SnapshotCount = mcfg.snapshotCount
  564. }
  565. m.SnapshotCatchUpEntries = etcdserver.DefaultSnapshotCatchUpEntries
  566. if mcfg.snapshotCatchUpEntries != 0 {
  567. m.SnapshotCatchUpEntries = mcfg.snapshotCatchUpEntries
  568. }
  569. // for the purpose of integration testing, simple token is enough
  570. m.AuthToken = "simple"
  571. if mcfg.authToken != "" {
  572. m.AuthToken = mcfg.authToken
  573. }
  574. m.BcryptCost = uint(bcrypt.MinCost) // use min bcrypt cost to speedy up integration testing
  575. m.grpcServerOpts = []grpc.ServerOption{}
  576. if mcfg.grpcKeepAliveMinTime > time.Duration(0) {
  577. m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
  578. MinTime: mcfg.grpcKeepAliveMinTime,
  579. PermitWithoutStream: false,
  580. }))
  581. }
  582. if mcfg.grpcKeepAliveInterval > time.Duration(0) &&
  583. mcfg.grpcKeepAliveTimeout > time.Duration(0) {
  584. m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveParams(keepalive.ServerParameters{
  585. Time: mcfg.grpcKeepAliveInterval,
  586. Timeout: mcfg.grpcKeepAliveTimeout,
  587. }))
  588. }
  589. m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize
  590. m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize
  591. m.useIP = mcfg.useIP
  592. m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
  593. m.InitialCorruptCheck = true
  594. m.LoggerConfig = &zap.Config{
  595. Level: zap.NewAtomicLevelAt(zap.InfoLevel),
  596. Development: false,
  597. Sampling: &zap.SamplingConfig{
  598. Initial: 100,
  599. Thereafter: 100,
  600. },
  601. Encoding: "json",
  602. EncoderConfig: zap.NewProductionEncoderConfig(),
  603. OutputPaths: []string{"/dev/null"},
  604. ErrorOutputPaths: []string{"/dev/null"},
  605. }
  606. if os.Getenv("CLUSTER_DEBUG") != "" {
  607. m.LoggerConfig.OutputPaths = []string{"stderr"}
  608. m.LoggerConfig.ErrorOutputPaths = []string{"stderr"}
  609. }
  610. m.Logger, err = m.LoggerConfig.Build()
  611. if err != nil {
  612. t.Fatal(err)
  613. }
  614. return m
  615. }
  616. // listenGRPC starts a grpc server over a unix domain socket on the member
  617. func (m *member) listenGRPC() error {
  618. // prefix with localhost so cert has right domain
  619. m.grpcAddr = "localhost:" + m.Name
  620. if m.useIP { // for IP-only TLS certs
  621. m.grpcAddr = "127.0.0.1:" + m.Name
  622. }
  623. l, err := transport.NewUnixListener(m.grpcAddr)
  624. if err != nil {
  625. return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
  626. }
  627. m.grpcBridge, err = newBridge(m.grpcAddr)
  628. if err != nil {
  629. l.Close()
  630. return err
  631. }
  632. m.grpcAddr = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + m.grpcBridge.inaddr
  633. m.grpcListener = l
  634. return nil
  635. }
  636. func (m *member) ElectionTimeout() time.Duration {
  637. return time.Duration(m.s.Cfg.ElectionTicks*int(m.s.Cfg.TickMs)) * time.Millisecond
  638. }
  639. func (m *member) ID() types.ID { return m.s.ID() }
  640. func (m *member) DropConnections() { m.grpcBridge.Reset() }
  641. func (m *member) PauseConnections() { m.grpcBridge.Pause() }
  642. func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
  643. func (m *member) Blackhole() { m.grpcBridge.Blackhole() }
  644. func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() }
  645. // NewClientV3 creates a new grpc client connection to the member
  646. func NewClientV3(m *member) (*clientv3.Client, error) {
  647. if m.grpcAddr == "" {
  648. return nil, fmt.Errorf("member not configured for grpc")
  649. }
  650. cfg := clientv3.Config{
  651. Endpoints: []string{m.grpcAddr},
  652. DialTimeout: 5 * time.Second,
  653. DialOptions: []grpc.DialOption{grpc.WithBlock()},
  654. MaxCallSendMsgSize: m.clientMaxCallSendMsgSize,
  655. MaxCallRecvMsgSize: m.clientMaxCallRecvMsgSize,
  656. }
  657. if m.ClientTLSInfo != nil {
  658. tls, err := m.ClientTLSInfo.ClientConfig()
  659. if err != nil {
  660. return nil, err
  661. }
  662. cfg.TLS = tls
  663. }
  664. if m.DialOptions != nil {
  665. cfg.DialOptions = append(cfg.DialOptions, m.DialOptions...)
  666. }
  667. return newClientV3(cfg)
  668. }
  669. // Clone returns a member with the same server configuration. The returned
  670. // member will not set PeerListeners and ClientListeners.
  671. func (m *member) Clone(t testing.TB) *member {
  672. mm := &member{}
  673. mm.ServerConfig = m.ServerConfig
  674. var err error
  675. clientURLStrs := m.ClientURLs.StringSlice()
  676. mm.ClientURLs, err = types.NewURLs(clientURLStrs)
  677. if err != nil {
  678. // this should never fail
  679. panic(err)
  680. }
  681. peerURLStrs := m.PeerURLs.StringSlice()
  682. mm.PeerURLs, err = types.NewURLs(peerURLStrs)
  683. if err != nil {
  684. // this should never fail
  685. panic(err)
  686. }
  687. clusterStr := m.InitialPeerURLsMap.String()
  688. mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  689. if err != nil {
  690. // this should never fail
  691. panic(err)
  692. }
  693. mm.InitialClusterToken = m.InitialClusterToken
  694. mm.ElectionTicks = m.ElectionTicks
  695. mm.PeerTLSInfo = m.PeerTLSInfo
  696. mm.ClientTLSInfo = m.ClientTLSInfo
  697. return mm
  698. }
  699. // Launch starts a member based on ServerConfig, PeerListeners
  700. // and ClientListeners.
  701. func (m *member) Launch() error {
  702. lg.Info(
  703. "launching a member",
  704. zap.String("name", m.Name),
  705. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  706. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  707. zap.String("grpc-address", m.grpcAddr),
  708. )
  709. var err error
  710. if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
  711. return fmt.Errorf("failed to initialize the etcd server: %v", err)
  712. }
  713. m.s.SyncTicker = time.NewTicker(500 * time.Millisecond)
  714. m.s.Start()
  715. var peerTLScfg *tls.Config
  716. if m.PeerTLSInfo != nil && !m.PeerTLSInfo.Empty() {
  717. if peerTLScfg, err = m.PeerTLSInfo.ServerConfig(); err != nil {
  718. return err
  719. }
  720. }
  721. if m.grpcListener != nil {
  722. var (
  723. tlscfg *tls.Config
  724. )
  725. if m.ClientTLSInfo != nil && !m.ClientTLSInfo.Empty() {
  726. tlscfg, err = m.ClientTLSInfo.ServerConfig()
  727. if err != nil {
  728. return err
  729. }
  730. }
  731. m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...)
  732. m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg)
  733. m.serverClient = v3client.New(m.s)
  734. lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient))
  735. epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient))
  736. go m.grpcServer.Serve(m.grpcListener)
  737. }
  738. m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.Logger, m.s)}
  739. h := (http.Handler)(m.raftHandler)
  740. if m.grpcListener != nil {
  741. h = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  742. if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
  743. m.grpcServerPeer.ServeHTTP(w, r)
  744. } else {
  745. m.raftHandler.ServeHTTP(w, r)
  746. }
  747. })
  748. }
  749. for _, ln := range m.PeerListeners {
  750. cm := cmux.New(ln)
  751. // don't hang on matcher after closing listener
  752. cm.SetReadTimeout(time.Second)
  753. if m.grpcServer != nil {
  754. grpcl := cm.Match(cmux.HTTP2())
  755. go m.grpcServerPeer.Serve(grpcl)
  756. }
  757. // serve http1/http2 rafthttp/grpc
  758. ll := cm.Match(cmux.Any())
  759. if peerTLScfg != nil {
  760. if ll, err = transport.NewTLSListener(ll, m.PeerTLSInfo); err != nil {
  761. return err
  762. }
  763. }
  764. hs := &httptest.Server{
  765. Listener: ll,
  766. Config: &http.Server{
  767. Handler: h,
  768. TLSConfig: peerTLScfg,
  769. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  770. },
  771. TLS: peerTLScfg,
  772. }
  773. hs.Start()
  774. donec := make(chan struct{})
  775. go func() {
  776. defer close(donec)
  777. cm.Serve()
  778. }()
  779. closer := func() {
  780. ll.Close()
  781. hs.CloseClientConnections()
  782. hs.Close()
  783. <-donec
  784. }
  785. m.serverClosers = append(m.serverClosers, closer)
  786. }
  787. for _, ln := range m.ClientListeners {
  788. hs := &httptest.Server{
  789. Listener: ln,
  790. Config: &http.Server{
  791. Handler: v2http.NewClientHandler(
  792. m.Logger,
  793. m.s,
  794. m.ServerConfig.ReqTimeout(),
  795. ),
  796. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  797. },
  798. }
  799. if m.ClientTLSInfo == nil {
  800. hs.Start()
  801. } else {
  802. info := m.ClientTLSInfo
  803. hs.TLS, err = info.ServerConfig()
  804. if err != nil {
  805. return err
  806. }
  807. // baseConfig is called on initial TLS handshake start.
  808. //
  809. // Previously,
  810. // 1. Server has non-empty (*tls.Config).Certificates on client hello
  811. // 2. Server calls (*tls.Config).GetCertificate iff:
  812. // - Server's (*tls.Config).Certificates is not empty, or
  813. // - Client supplies SNI; non-empty (*tls.ClientHelloInfo).ServerName
  814. //
  815. // When (*tls.Config).Certificates is always populated on initial handshake,
  816. // client is expected to provide a valid matching SNI to pass the TLS
  817. // verification, thus trigger server (*tls.Config).GetCertificate to reload
  818. // TLS assets. However, a cert whose SAN field does not include domain names
  819. // but only IP addresses, has empty (*tls.ClientHelloInfo).ServerName, thus
  820. // it was never able to trigger TLS reload on initial handshake; first
  821. // ceritifcate object was being used, never being updated.
  822. //
  823. // Now, (*tls.Config).Certificates is created empty on initial TLS client
  824. // handshake, in order to trigger (*tls.Config).GetCertificate and populate
  825. // rest of the certificates on every new TLS connection, even when client
  826. // SNI is empty (e.g. cert only includes IPs).
  827. //
  828. // This introduces another problem with "httptest.Server":
  829. // when server initial certificates are empty, certificates
  830. // are overwritten by Go's internal test certs, which have
  831. // different SAN fields (e.g. example.com). To work around,
  832. // re-overwrite (*tls.Config).Certificates before starting
  833. // test server.
  834. tlsCert, err := tlsutil.NewCert(info.CertFile, info.KeyFile, nil)
  835. if err != nil {
  836. return err
  837. }
  838. hs.TLS.Certificates = []tls.Certificate{*tlsCert}
  839. hs.StartTLS()
  840. }
  841. closer := func() {
  842. ln.Close()
  843. hs.CloseClientConnections()
  844. hs.Close()
  845. }
  846. m.serverClosers = append(m.serverClosers, closer)
  847. }
  848. lg.Info(
  849. "launched a member",
  850. zap.String("name", m.Name),
  851. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  852. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  853. zap.String("grpc-address", m.grpcAddr),
  854. )
  855. return nil
  856. }
  857. func (m *member) WaitOK(t testing.TB) {
  858. m.WaitStarted(t)
  859. for m.s.Leader() == 0 {
  860. time.Sleep(tickDuration)
  861. }
  862. }
  863. func (m *member) WaitStarted(t testing.TB) {
  864. cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
  865. kapi := client.NewKeysAPI(cc)
  866. for {
  867. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  868. _, err := kapi.Get(ctx, "/", nil)
  869. if err != nil {
  870. time.Sleep(tickDuration)
  871. continue
  872. }
  873. cancel()
  874. break
  875. }
  876. }
  877. func WaitClientV3(t testing.TB, kv clientv3.KV) {
  878. timeout := time.Now().Add(requestTimeout)
  879. var err error
  880. for time.Now().Before(timeout) {
  881. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  882. _, err = kv.Get(ctx, "/")
  883. cancel()
  884. if err == nil {
  885. return
  886. }
  887. time.Sleep(tickDuration)
  888. }
  889. if err != nil {
  890. t.Fatalf("timed out waiting for client: %v", err)
  891. }
  892. }
  893. func (m *member) URL() string { return m.ClientURLs[0].String() }
  894. func (m *member) Pause() {
  895. m.raftHandler.Pause()
  896. m.s.PauseSending()
  897. }
  898. func (m *member) Resume() {
  899. m.raftHandler.Resume()
  900. m.s.ResumeSending()
  901. }
  902. // Close stops the member's etcdserver and closes its connections
  903. func (m *member) Close() {
  904. if m.grpcBridge != nil {
  905. m.grpcBridge.Close()
  906. m.grpcBridge = nil
  907. }
  908. if m.serverClient != nil {
  909. m.serverClient.Close()
  910. m.serverClient = nil
  911. }
  912. if m.grpcServer != nil {
  913. m.grpcServer.Stop()
  914. m.grpcServer.GracefulStop()
  915. m.grpcServer = nil
  916. m.grpcServerPeer.Stop()
  917. m.grpcServerPeer.GracefulStop()
  918. m.grpcServerPeer = nil
  919. }
  920. m.s.HardStop()
  921. for _, f := range m.serverClosers {
  922. f()
  923. }
  924. }
  925. // Stop stops the member, but the data dir of the member is preserved.
  926. func (m *member) Stop(t testing.TB) {
  927. lg.Info(
  928. "stopping a member",
  929. zap.String("name", m.Name),
  930. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  931. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  932. zap.String("grpc-address", m.grpcAddr),
  933. )
  934. m.Close()
  935. m.serverClosers = nil
  936. lg.Info(
  937. "stopped a member",
  938. zap.String("name", m.Name),
  939. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  940. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  941. zap.String("grpc-address", m.grpcAddr),
  942. )
  943. }
  944. // checkLeaderTransition waits for leader transition, returning the new leader ID.
  945. func checkLeaderTransition(m *member, oldLead uint64) uint64 {
  946. interval := time.Duration(m.s.Cfg.TickMs) * time.Millisecond
  947. for m.s.Lead() == 0 || (m.s.Lead() == oldLead) {
  948. time.Sleep(interval)
  949. }
  950. return m.s.Lead()
  951. }
  952. // StopNotify unblocks when a member stop completes
  953. func (m *member) StopNotify() <-chan struct{} {
  954. return m.s.StopNotify()
  955. }
  956. // Restart starts the member using the preserved data dir.
  957. func (m *member) Restart(t testing.TB) error {
  958. lg.Info(
  959. "restarting a member",
  960. zap.String("name", m.Name),
  961. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  962. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  963. zap.String("grpc-address", m.grpcAddr),
  964. )
  965. newPeerListeners := make([]net.Listener, 0)
  966. for _, ln := range m.PeerListeners {
  967. newPeerListeners = append(newPeerListeners, NewListenerWithAddr(t, ln.Addr().String()))
  968. }
  969. m.PeerListeners = newPeerListeners
  970. newClientListeners := make([]net.Listener, 0)
  971. for _, ln := range m.ClientListeners {
  972. newClientListeners = append(newClientListeners, NewListenerWithAddr(t, ln.Addr().String()))
  973. }
  974. m.ClientListeners = newClientListeners
  975. if m.grpcListener != nil {
  976. if err := m.listenGRPC(); err != nil {
  977. t.Fatal(err)
  978. }
  979. }
  980. err := m.Launch()
  981. lg.Info(
  982. "restarted a member",
  983. zap.String("name", m.Name),
  984. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  985. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  986. zap.String("grpc-address", m.grpcAddr),
  987. zap.Error(err),
  988. )
  989. return err
  990. }
  991. // Terminate stops the member and removes the data dir.
  992. func (m *member) Terminate(t testing.TB) {
  993. lg.Info(
  994. "terminating a member",
  995. zap.String("name", m.Name),
  996. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  997. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  998. zap.String("grpc-address", m.grpcAddr),
  999. )
  1000. m.Close()
  1001. if !m.keepDataDirTerminate {
  1002. if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
  1003. t.Fatal(err)
  1004. }
  1005. }
  1006. lg.Info(
  1007. "terminated a member",
  1008. zap.String("name", m.Name),
  1009. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  1010. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  1011. zap.String("grpc-address", m.grpcAddr),
  1012. )
  1013. }
  1014. // Metric gets the metric value for a member
  1015. func (m *member) Metric(metricName string) (string, error) {
  1016. cfgtls := transport.TLSInfo{}
  1017. tr, err := transport.NewTimeoutTransport(cfgtls, time.Second, time.Second, time.Second)
  1018. if err != nil {
  1019. return "", err
  1020. }
  1021. cli := &http.Client{Transport: tr}
  1022. resp, err := cli.Get(m.ClientURLs[0].String() + "/metrics")
  1023. if err != nil {
  1024. return "", err
  1025. }
  1026. defer resp.Body.Close()
  1027. b, rerr := ioutil.ReadAll(resp.Body)
  1028. if rerr != nil {
  1029. return "", rerr
  1030. }
  1031. lines := strings.Split(string(b), "\n")
  1032. for _, l := range lines {
  1033. if strings.HasPrefix(l, metricName) {
  1034. return strings.Split(l, " ")[1], nil
  1035. }
  1036. }
  1037. return "", nil
  1038. }
  1039. // InjectPartition drops connections from m to others, vice versa.
  1040. func (m *member) InjectPartition(t testing.TB, others ...*member) {
  1041. for _, other := range others {
  1042. m.s.CutPeer(other.s.ID())
  1043. other.s.CutPeer(m.s.ID())
  1044. }
  1045. }
  1046. // RecoverPartition recovers connections from m to others, vice versa.
  1047. func (m *member) RecoverPartition(t testing.TB, others ...*member) {
  1048. for _, other := range others {
  1049. m.s.MendPeer(other.s.ID())
  1050. other.s.MendPeer(m.s.ID())
  1051. }
  1052. }
  1053. func MustNewHTTPClient(t testing.TB, eps []string, tls *transport.TLSInfo) client.Client {
  1054. cfgtls := transport.TLSInfo{}
  1055. if tls != nil {
  1056. cfgtls = *tls
  1057. }
  1058. cfg := client.Config{Transport: mustNewTransport(t, cfgtls), Endpoints: eps}
  1059. c, err := client.New(cfg)
  1060. if err != nil {
  1061. t.Fatal(err)
  1062. }
  1063. return c
  1064. }
  1065. func mustNewTransport(t testing.TB, tlsInfo transport.TLSInfo) *http.Transport {
  1066. // tick in integration test is short, so 1s dial timeout could play well.
  1067. tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
  1068. if err != nil {
  1069. t.Fatal(err)
  1070. }
  1071. return tr
  1072. }
  1073. type SortableMemberSliceByPeerURLs []client.Member
  1074. func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
  1075. func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {
  1076. return p[i].PeerURLs[0] < p[j].PeerURLs[0]
  1077. }
  1078. func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  1079. type ClusterV3 struct {
  1080. *cluster
  1081. mu sync.Mutex
  1082. clients []*clientv3.Client
  1083. }
  1084. // NewClusterV3 returns a launched cluster with a grpc client connection
  1085. // for each cluster member.
  1086. func NewClusterV3(t testing.TB, cfg *ClusterConfig) *ClusterV3 {
  1087. cfg.UseGRPC = true
  1088. if os.Getenv("CLIENT_DEBUG") != "" {
  1089. clientv3.SetLogger(grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 4))
  1090. }
  1091. clus := &ClusterV3{
  1092. cluster: NewClusterByConfig(t, cfg),
  1093. }
  1094. clus.Launch(t)
  1095. if !cfg.SkipCreatingClient {
  1096. for _, m := range clus.Members {
  1097. client, err := NewClientV3(m)
  1098. if err != nil {
  1099. t.Fatalf("cannot create client: %v", err)
  1100. }
  1101. clus.clients = append(clus.clients, client)
  1102. }
  1103. }
  1104. return clus
  1105. }
  1106. func (c *ClusterV3) TakeClient(idx int) {
  1107. c.mu.Lock()
  1108. c.clients[idx] = nil
  1109. c.mu.Unlock()
  1110. }
  1111. func (c *ClusterV3) Terminate(t testing.TB) {
  1112. c.mu.Lock()
  1113. for _, client := range c.clients {
  1114. if client == nil {
  1115. continue
  1116. }
  1117. if err := client.Close(); err != nil {
  1118. t.Error(err)
  1119. }
  1120. }
  1121. c.mu.Unlock()
  1122. c.cluster.Terminate(t)
  1123. }
  1124. func (c *ClusterV3) RandClient() *clientv3.Client {
  1125. return c.clients[rand.Intn(len(c.clients))]
  1126. }
  1127. func (c *ClusterV3) Client(i int) *clientv3.Client {
  1128. return c.clients[i]
  1129. }
  1130. type grpcAPI struct {
  1131. // Cluster is the cluster API for the client's connection.
  1132. Cluster pb.ClusterClient
  1133. // KV is the keyvalue API for the client's connection.
  1134. KV pb.KVClient
  1135. // Lease is the lease API for the client's connection.
  1136. Lease pb.LeaseClient
  1137. // Watch is the watch API for the client's connection.
  1138. Watch pb.WatchClient
  1139. // Maintenance is the maintenance API for the client's connection.
  1140. Maintenance pb.MaintenanceClient
  1141. // Auth is the authentication API for the client's connection.
  1142. Auth pb.AuthClient
  1143. // Lock is the lock API for the client's connection.
  1144. Lock lockpb.LockClient
  1145. // Election is the election API for the client's connection.
  1146. Election epb.ElectionClient
  1147. }