cluster.go 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "fmt"
  19. "io/ioutil"
  20. "log"
  21. "math/rand"
  22. "net"
  23. "net/http"
  24. "net/http/httptest"
  25. "os"
  26. "reflect"
  27. "sort"
  28. "strings"
  29. "sync"
  30. "sync/atomic"
  31. "testing"
  32. "time"
  33. "go.etcd.io/etcd/client"
  34. "go.etcd.io/etcd/clientv3"
  35. "go.etcd.io/etcd/embed"
  36. "go.etcd.io/etcd/etcdserver"
  37. "go.etcd.io/etcd/etcdserver/api/etcdhttp"
  38. "go.etcd.io/etcd/etcdserver/api/rafthttp"
  39. "go.etcd.io/etcd/etcdserver/api/v2http"
  40. "go.etcd.io/etcd/etcdserver/api/v3client"
  41. "go.etcd.io/etcd/etcdserver/api/v3election"
  42. epb "go.etcd.io/etcd/etcdserver/api/v3election/v3electionpb"
  43. "go.etcd.io/etcd/etcdserver/api/v3lock"
  44. lockpb "go.etcd.io/etcd/etcdserver/api/v3lock/v3lockpb"
  45. "go.etcd.io/etcd/etcdserver/api/v3rpc"
  46. pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
  47. "go.etcd.io/etcd/pkg/logutil"
  48. "go.etcd.io/etcd/pkg/testutil"
  49. "go.etcd.io/etcd/pkg/tlsutil"
  50. "go.etcd.io/etcd/pkg/transport"
  51. "go.etcd.io/etcd/pkg/types"
  52. "github.com/soheilhy/cmux"
  53. "go.uber.org/zap"
  54. "golang.org/x/crypto/bcrypt"
  55. "google.golang.org/grpc"
  56. "google.golang.org/grpc/grpclog"
  57. "google.golang.org/grpc/keepalive"
  58. )
  59. const (
  60. // RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss.
  61. RequestWaitTimeout = 3 * time.Second
  62. tickDuration = 10 * time.Millisecond
  63. requestTimeout = 20 * time.Second
  64. clusterName = "etcd"
  65. basePort = 21000
  66. URLScheme = "unix"
  67. URLSchemeTLS = "unixs"
  68. )
  69. var (
  70. electionTicks = 10
  71. // integration test uses unique ports, counting up, to listen for each
  72. // member, ensuring restarted members can listen on the same port again.
  73. localListenCount = int64(0)
  74. testTLSInfo = transport.TLSInfo{
  75. KeyFile: "./fixtures/server.key.insecure",
  76. CertFile: "./fixtures/server.crt",
  77. TrustedCAFile: "./fixtures/ca.crt",
  78. ClientCertAuth: true,
  79. }
  80. testTLSInfoIP = transport.TLSInfo{
  81. KeyFile: "./fixtures/server-ip.key.insecure",
  82. CertFile: "./fixtures/server-ip.crt",
  83. TrustedCAFile: "./fixtures/ca.crt",
  84. ClientCertAuth: true,
  85. }
  86. testTLSInfoExpired = transport.TLSInfo{
  87. KeyFile: "./fixtures-expired/server.key.insecure",
  88. CertFile: "./fixtures-expired/server.crt",
  89. TrustedCAFile: "./fixtures-expired/ca.crt",
  90. ClientCertAuth: true,
  91. }
  92. testTLSInfoExpiredIP = transport.TLSInfo{
  93. KeyFile: "./fixtures-expired/server-ip.key.insecure",
  94. CertFile: "./fixtures-expired/server-ip.crt",
  95. TrustedCAFile: "./fixtures-expired/ca.crt",
  96. ClientCertAuth: true,
  97. }
  98. defaultTokenJWT = "jwt,pub-key=./fixtures/server.crt,priv-key=./fixtures/server.key.insecure,sign-method=RS256,ttl=1s"
  99. lg = zap.NewNop()
  100. )
  101. func init() {
  102. if os.Getenv("CLUSTER_DEBUG") != "" {
  103. lg, _ = zap.NewProduction()
  104. }
  105. }
  106. type ClusterConfig struct {
  107. Size int
  108. PeerTLS *transport.TLSInfo
  109. ClientTLS *transport.TLSInfo
  110. DiscoveryURL string
  111. AuthToken string
  112. UseGRPC bool
  113. QuotaBackendBytes int64
  114. MaxTxnOps uint
  115. MaxRequestBytes uint
  116. SnapshotCount uint64
  117. SnapshotCatchUpEntries uint64
  118. GRPCKeepAliveMinTime time.Duration
  119. GRPCKeepAliveInterval time.Duration
  120. GRPCKeepAliveTimeout time.Duration
  121. // SkipCreatingClient to skip creating clients for each member.
  122. SkipCreatingClient bool
  123. ClientMaxCallSendMsgSize int
  124. ClientMaxCallRecvMsgSize int
  125. // UseIP is true to use only IP for gRPC requests.
  126. UseIP bool
  127. LeaseCheckpointInterval time.Duration
  128. }
  129. type cluster struct {
  130. cfg *ClusterConfig
  131. Members []*member
  132. }
  133. func schemeFromTLSInfo(tls *transport.TLSInfo) string {
  134. if tls == nil {
  135. return URLScheme
  136. }
  137. return URLSchemeTLS
  138. }
  139. func (c *cluster) fillClusterForMembers() error {
  140. if c.cfg.DiscoveryURL != "" {
  141. // cluster will be discovered
  142. return nil
  143. }
  144. addrs := make([]string, 0)
  145. for _, m := range c.Members {
  146. scheme := schemeFromTLSInfo(m.PeerTLSInfo)
  147. for _, l := range m.PeerListeners {
  148. addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String()))
  149. }
  150. }
  151. clusterStr := strings.Join(addrs, ",")
  152. var err error
  153. for _, m := range c.Members {
  154. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  155. if err != nil {
  156. return err
  157. }
  158. }
  159. return nil
  160. }
  161. func newCluster(t testing.TB, cfg *ClusterConfig) *cluster {
  162. c := &cluster{cfg: cfg}
  163. ms := make([]*member, cfg.Size)
  164. for i := 0; i < cfg.Size; i++ {
  165. ms[i] = c.mustNewMember(t)
  166. }
  167. c.Members = ms
  168. if err := c.fillClusterForMembers(); err != nil {
  169. t.Fatal(err)
  170. }
  171. return c
  172. }
  173. // NewCluster returns an unlaunched cluster of the given size which has been
  174. // set to use static bootstrap.
  175. func NewCluster(t testing.TB, size int) *cluster {
  176. return newCluster(t, &ClusterConfig{Size: size})
  177. }
  178. // NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration
  179. func NewClusterByConfig(t testing.TB, cfg *ClusterConfig) *cluster {
  180. return newCluster(t, cfg)
  181. }
  182. func (c *cluster) Launch(t testing.TB) {
  183. errc := make(chan error)
  184. for _, m := range c.Members {
  185. // Members are launched in separate goroutines because if they boot
  186. // using discovery url, they have to wait for others to register to continue.
  187. go func(m *member) {
  188. errc <- m.Launch()
  189. }(m)
  190. }
  191. for range c.Members {
  192. if err := <-errc; err != nil {
  193. t.Fatalf("error setting up member: %v", err)
  194. }
  195. }
  196. // wait cluster to be stable to receive future client requests
  197. c.waitMembersMatch(t, c.HTTPMembers())
  198. c.waitVersion()
  199. }
  200. func (c *cluster) URL(i int) string {
  201. return c.Members[i].ClientURLs[0].String()
  202. }
  203. // URLs returns a list of all active client URLs in the cluster
  204. func (c *cluster) URLs() []string {
  205. return getMembersURLs(c.Members)
  206. }
  207. func getMembersURLs(members []*member) []string {
  208. urls := make([]string, 0)
  209. for _, m := range members {
  210. select {
  211. case <-m.s.StopNotify():
  212. continue
  213. default:
  214. }
  215. for _, u := range m.ClientURLs {
  216. urls = append(urls, u.String())
  217. }
  218. }
  219. return urls
  220. }
  221. // HTTPMembers returns a list of all active members as client.Members
  222. func (c *cluster) HTTPMembers() []client.Member {
  223. ms := []client.Member{}
  224. for _, m := range c.Members {
  225. pScheme := schemeFromTLSInfo(m.PeerTLSInfo)
  226. cScheme := schemeFromTLSInfo(m.ClientTLSInfo)
  227. cm := client.Member{Name: m.Name}
  228. for _, ln := range m.PeerListeners {
  229. cm.PeerURLs = append(cm.PeerURLs, pScheme+"://"+ln.Addr().String())
  230. }
  231. for _, ln := range m.ClientListeners {
  232. cm.ClientURLs = append(cm.ClientURLs, cScheme+"://"+ln.Addr().String())
  233. }
  234. ms = append(ms, cm)
  235. }
  236. return ms
  237. }
  238. func (c *cluster) mustNewMember(t testing.TB) *member {
  239. m := mustNewMember(t,
  240. memberConfig{
  241. name: c.name(rand.Int()),
  242. authToken: c.cfg.AuthToken,
  243. peerTLS: c.cfg.PeerTLS,
  244. clientTLS: c.cfg.ClientTLS,
  245. quotaBackendBytes: c.cfg.QuotaBackendBytes,
  246. maxTxnOps: c.cfg.MaxTxnOps,
  247. maxRequestBytes: c.cfg.MaxRequestBytes,
  248. snapshotCount: c.cfg.SnapshotCount,
  249. snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries,
  250. grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime,
  251. grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
  252. grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout,
  253. clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
  254. clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
  255. useIP: c.cfg.UseIP,
  256. leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
  257. })
  258. m.DiscoveryURL = c.cfg.DiscoveryURL
  259. if c.cfg.UseGRPC {
  260. if err := m.listenGRPC(); err != nil {
  261. t.Fatal(err)
  262. }
  263. }
  264. return m
  265. }
  266. func (c *cluster) addMember(t testing.TB) {
  267. m := c.mustNewMember(t)
  268. scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
  269. // send add request to the cluster
  270. var err error
  271. for i := 0; i < len(c.Members); i++ {
  272. clientURL := c.URL(i)
  273. peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
  274. if err = c.addMemberByURL(t, clientURL, peerURL); err == nil {
  275. break
  276. }
  277. }
  278. if err != nil {
  279. t.Fatalf("add member failed on all members error: %v", err)
  280. }
  281. m.InitialPeerURLsMap = types.URLsMap{}
  282. for _, mm := range c.Members {
  283. m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
  284. }
  285. m.InitialPeerURLsMap[m.Name] = m.PeerURLs
  286. m.NewCluster = false
  287. if err := m.Launch(); err != nil {
  288. t.Fatal(err)
  289. }
  290. c.Members = append(c.Members, m)
  291. // wait cluster to be stable to receive future client requests
  292. c.waitMembersMatch(t, c.HTTPMembers())
  293. }
  294. func (c *cluster) addMemberByURL(t testing.TB, clientURL, peerURL string) error {
  295. cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
  296. ma := client.NewMembersAPI(cc)
  297. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  298. _, err := ma.Add(ctx, peerURL)
  299. cancel()
  300. if err != nil {
  301. return err
  302. }
  303. // wait for the add node entry applied in the cluster
  304. members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
  305. c.waitMembersMatch(t, members)
  306. return nil
  307. }
  308. func (c *cluster) AddMember(t testing.TB) {
  309. c.addMember(t)
  310. }
  311. func (c *cluster) RemoveMember(t testing.TB, id uint64) {
  312. if err := c.removeMember(t, id); err != nil {
  313. t.Fatal(err)
  314. }
  315. }
  316. func (c *cluster) removeMember(t testing.TB, id uint64) error {
  317. // send remove request to the cluster
  318. cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
  319. ma := client.NewMembersAPI(cc)
  320. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  321. err := ma.Remove(ctx, types.ID(id).String())
  322. cancel()
  323. if err != nil {
  324. return err
  325. }
  326. newMembers := make([]*member, 0)
  327. for _, m := range c.Members {
  328. if uint64(m.s.ID()) != id {
  329. newMembers = append(newMembers, m)
  330. } else {
  331. select {
  332. case <-m.s.StopNotify():
  333. m.Terminate(t)
  334. // 1s stop delay + election timeout + 1s disk and network delay + connection write timeout
  335. // TODO: remove connection write timeout by selecting on http response closeNotifier
  336. // blocking on https://github.com/golang/go/issues/9524
  337. case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout):
  338. t.Fatalf("failed to remove member %s in time", m.s.ID())
  339. }
  340. }
  341. }
  342. c.Members = newMembers
  343. c.waitMembersMatch(t, c.HTTPMembers())
  344. return nil
  345. }
  346. func (c *cluster) Terminate(t testing.TB) {
  347. var wg sync.WaitGroup
  348. wg.Add(len(c.Members))
  349. for _, m := range c.Members {
  350. go func(mm *member) {
  351. defer wg.Done()
  352. mm.Terminate(t)
  353. }(m)
  354. }
  355. wg.Wait()
  356. }
  357. func (c *cluster) waitMembersMatch(t testing.TB, membs []client.Member) {
  358. for _, u := range c.URLs() {
  359. cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
  360. ma := client.NewMembersAPI(cc)
  361. for {
  362. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  363. ms, err := ma.List(ctx)
  364. cancel()
  365. if err == nil && isMembersEqual(ms, membs) {
  366. break
  367. }
  368. time.Sleep(tickDuration)
  369. }
  370. }
  371. }
  372. func (c *cluster) WaitLeader(t testing.TB) int { return c.waitLeader(t, c.Members) }
  373. // waitLeader waits until given members agree on the same leader.
  374. func (c *cluster) waitLeader(t testing.TB, membs []*member) int {
  375. possibleLead := make(map[uint64]bool)
  376. var lead uint64
  377. for _, m := range membs {
  378. possibleLead[uint64(m.s.ID())] = true
  379. }
  380. cc := MustNewHTTPClient(t, getMembersURLs(membs), nil)
  381. kapi := client.NewKeysAPI(cc)
  382. // ensure leader is up via linearizable get
  383. for {
  384. ctx, cancel := context.WithTimeout(context.Background(), 10*tickDuration+time.Second)
  385. _, err := kapi.Get(ctx, "0", &client.GetOptions{Quorum: true})
  386. cancel()
  387. if err == nil || strings.Contains(err.Error(), "Key not found") {
  388. break
  389. }
  390. }
  391. for lead == 0 || !possibleLead[lead] {
  392. lead = 0
  393. for _, m := range membs {
  394. select {
  395. case <-m.s.StopNotify():
  396. continue
  397. default:
  398. }
  399. if lead != 0 && lead != m.s.Lead() {
  400. lead = 0
  401. time.Sleep(10 * tickDuration)
  402. break
  403. }
  404. lead = m.s.Lead()
  405. }
  406. }
  407. for i, m := range membs {
  408. if uint64(m.s.ID()) == lead {
  409. return i
  410. }
  411. }
  412. return -1
  413. }
  414. func (c *cluster) WaitNoLeader() { c.waitNoLeader(c.Members) }
  415. // waitNoLeader waits until given members lose leader.
  416. func (c *cluster) waitNoLeader(membs []*member) {
  417. noLeader := false
  418. for !noLeader {
  419. noLeader = true
  420. for _, m := range membs {
  421. select {
  422. case <-m.s.StopNotify():
  423. continue
  424. default:
  425. }
  426. if m.s.Lead() != 0 {
  427. noLeader = false
  428. time.Sleep(10 * tickDuration)
  429. break
  430. }
  431. }
  432. }
  433. }
  434. func (c *cluster) waitVersion() {
  435. for _, m := range c.Members {
  436. for {
  437. if m.s.ClusterVersion() != nil {
  438. break
  439. }
  440. time.Sleep(tickDuration)
  441. }
  442. }
  443. }
  444. func (c *cluster) name(i int) string {
  445. return fmt.Sprint(i)
  446. }
  447. // isMembersEqual checks whether two members equal except ID field.
  448. // The given wmembs should always set ID field to empty string.
  449. func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
  450. sort.Sort(SortableMemberSliceByPeerURLs(membs))
  451. sort.Sort(SortableMemberSliceByPeerURLs(wmembs))
  452. for i := range membs {
  453. membs[i].ID = ""
  454. }
  455. return reflect.DeepEqual(membs, wmembs)
  456. }
  457. func newLocalListener(t testing.TB) net.Listener {
  458. c := atomic.AddInt64(&localListenCount, 1)
  459. // Go 1.8+ allows only numbers in port
  460. addr := fmt.Sprintf("127.0.0.1:%05d%05d", c+basePort, os.Getpid())
  461. return NewListenerWithAddr(t, addr)
  462. }
  463. func NewListenerWithAddr(t testing.TB, addr string) net.Listener {
  464. l, err := transport.NewUnixListener(addr)
  465. if err != nil {
  466. t.Fatal(err)
  467. }
  468. return l
  469. }
  470. type member struct {
  471. etcdserver.ServerConfig
  472. PeerListeners, ClientListeners []net.Listener
  473. grpcListener net.Listener
  474. // PeerTLSInfo enables peer TLS when set
  475. PeerTLSInfo *transport.TLSInfo
  476. // ClientTLSInfo enables client TLS when set
  477. ClientTLSInfo *transport.TLSInfo
  478. DialOptions []grpc.DialOption
  479. raftHandler *testutil.PauseableHandler
  480. s *etcdserver.EtcdServer
  481. serverClosers []func()
  482. grpcServerOpts []grpc.ServerOption
  483. grpcServer *grpc.Server
  484. grpcServerPeer *grpc.Server
  485. grpcAddr string
  486. grpcBridge *bridge
  487. // serverClient is a clientv3 that directly calls the etcdserver.
  488. serverClient *clientv3.Client
  489. keepDataDirTerminate bool
  490. clientMaxCallSendMsgSize int
  491. clientMaxCallRecvMsgSize int
  492. useIP bool
  493. }
  494. func (m *member) GRPCAddr() string { return m.grpcAddr }
  495. type memberConfig struct {
  496. name string
  497. peerTLS *transport.TLSInfo
  498. clientTLS *transport.TLSInfo
  499. authToken string
  500. quotaBackendBytes int64
  501. maxTxnOps uint
  502. maxRequestBytes uint
  503. snapshotCount uint64
  504. snapshotCatchUpEntries uint64
  505. grpcKeepAliveMinTime time.Duration
  506. grpcKeepAliveInterval time.Duration
  507. grpcKeepAliveTimeout time.Duration
  508. clientMaxCallSendMsgSize int
  509. clientMaxCallRecvMsgSize int
  510. useIP bool
  511. leaseCheckpointInterval time.Duration
  512. }
  513. // mustNewMember return an inited member with the given name. If peerTLS is
  514. // set, it will use https scheme to communicate between peers.
  515. func mustNewMember(t testing.TB, mcfg memberConfig) *member {
  516. var err error
  517. m := &member{}
  518. peerScheme := schemeFromTLSInfo(mcfg.peerTLS)
  519. clientScheme := schemeFromTLSInfo(mcfg.clientTLS)
  520. pln := newLocalListener(t)
  521. m.PeerListeners = []net.Listener{pln}
  522. m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()})
  523. if err != nil {
  524. t.Fatal(err)
  525. }
  526. m.PeerTLSInfo = mcfg.peerTLS
  527. cln := newLocalListener(t)
  528. m.ClientListeners = []net.Listener{cln}
  529. m.ClientURLs, err = types.NewURLs([]string{clientScheme + "://" + cln.Addr().String()})
  530. if err != nil {
  531. t.Fatal(err)
  532. }
  533. m.ClientTLSInfo = mcfg.clientTLS
  534. m.Name = mcfg.name
  535. m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
  536. if err != nil {
  537. t.Fatal(err)
  538. }
  539. clusterStr := fmt.Sprintf("%s=%s://%s", mcfg.name, peerScheme, pln.Addr().String())
  540. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  541. if err != nil {
  542. t.Fatal(err)
  543. }
  544. m.InitialClusterToken = clusterName
  545. m.NewCluster = true
  546. m.BootstrapTimeout = 10 * time.Millisecond
  547. if m.PeerTLSInfo != nil {
  548. m.ServerConfig.PeerTLSInfo = *m.PeerTLSInfo
  549. }
  550. m.ElectionTicks = electionTicks
  551. m.InitialElectionTickAdvance = true
  552. m.TickMs = uint(tickDuration / time.Millisecond)
  553. m.QuotaBackendBytes = mcfg.quotaBackendBytes
  554. m.MaxTxnOps = mcfg.maxTxnOps
  555. if m.MaxTxnOps == 0 {
  556. m.MaxTxnOps = embed.DefaultMaxTxnOps
  557. }
  558. m.MaxRequestBytes = mcfg.maxRequestBytes
  559. if m.MaxRequestBytes == 0 {
  560. m.MaxRequestBytes = embed.DefaultMaxRequestBytes
  561. }
  562. m.SnapshotCount = etcdserver.DefaultSnapshotCount
  563. if mcfg.snapshotCount != 0 {
  564. m.SnapshotCount = mcfg.snapshotCount
  565. }
  566. m.SnapshotCatchUpEntries = etcdserver.DefaultSnapshotCatchUpEntries
  567. if mcfg.snapshotCatchUpEntries != 0 {
  568. m.SnapshotCatchUpEntries = mcfg.snapshotCatchUpEntries
  569. }
  570. // for the purpose of integration testing, simple token is enough
  571. m.AuthToken = "simple"
  572. if mcfg.authToken != "" {
  573. m.AuthToken = mcfg.authToken
  574. }
  575. m.BcryptCost = uint(bcrypt.MinCost) // use min bcrypt cost to speedy up integration testing
  576. m.grpcServerOpts = []grpc.ServerOption{}
  577. if mcfg.grpcKeepAliveMinTime > time.Duration(0) {
  578. m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
  579. MinTime: mcfg.grpcKeepAliveMinTime,
  580. PermitWithoutStream: false,
  581. }))
  582. }
  583. if mcfg.grpcKeepAliveInterval > time.Duration(0) &&
  584. mcfg.grpcKeepAliveTimeout > time.Duration(0) {
  585. m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveParams(keepalive.ServerParameters{
  586. Time: mcfg.grpcKeepAliveInterval,
  587. Timeout: mcfg.grpcKeepAliveTimeout,
  588. }))
  589. }
  590. m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize
  591. m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize
  592. m.useIP = mcfg.useIP
  593. m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
  594. m.InitialCorruptCheck = true
  595. lcfg := logutil.DefaultZapLoggerConfig
  596. m.LoggerConfig = &lcfg
  597. m.LoggerConfig.OutputPaths = []string{"/dev/null"}
  598. m.LoggerConfig.ErrorOutputPaths = []string{"/dev/null"}
  599. if os.Getenv("CLUSTER_DEBUG") != "" {
  600. m.LoggerConfig.OutputPaths = []string{"stderr"}
  601. m.LoggerConfig.ErrorOutputPaths = []string{"stderr"}
  602. }
  603. m.Logger, err = m.LoggerConfig.Build()
  604. if err != nil {
  605. t.Fatal(err)
  606. }
  607. return m
  608. }
  609. // listenGRPC starts a grpc server over a unix domain socket on the member
  610. func (m *member) listenGRPC() error {
  611. // prefix with localhost so cert has right domain
  612. m.grpcAddr = "localhost:" + m.Name
  613. if m.useIP { // for IP-only TLS certs
  614. m.grpcAddr = "127.0.0.1:" + m.Name
  615. }
  616. l, err := transport.NewUnixListener(m.grpcAddr)
  617. if err != nil {
  618. return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
  619. }
  620. m.grpcBridge, err = newBridge(m.grpcAddr)
  621. if err != nil {
  622. l.Close()
  623. return err
  624. }
  625. m.grpcAddr = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + m.grpcBridge.inaddr
  626. m.grpcListener = l
  627. return nil
  628. }
  629. func (m *member) ElectionTimeout() time.Duration {
  630. return time.Duration(m.s.Cfg.ElectionTicks*int(m.s.Cfg.TickMs)) * time.Millisecond
  631. }
  632. func (m *member) ID() types.ID { return m.s.ID() }
  633. func (m *member) DropConnections() { m.grpcBridge.Reset() }
  634. func (m *member) PauseConnections() { m.grpcBridge.Pause() }
  635. func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
  636. func (m *member) Blackhole() { m.grpcBridge.Blackhole() }
  637. func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() }
  638. // NewClientV3 creates a new grpc client connection to the member
  639. func NewClientV3(m *member) (*clientv3.Client, error) {
  640. if m.grpcAddr == "" {
  641. return nil, fmt.Errorf("member not configured for grpc")
  642. }
  643. cfg := clientv3.Config{
  644. Endpoints: []string{m.grpcAddr},
  645. DialTimeout: 5 * time.Second,
  646. DialOptions: []grpc.DialOption{grpc.WithBlock()},
  647. MaxCallSendMsgSize: m.clientMaxCallSendMsgSize,
  648. MaxCallRecvMsgSize: m.clientMaxCallRecvMsgSize,
  649. }
  650. if m.ClientTLSInfo != nil {
  651. tls, err := m.ClientTLSInfo.ClientConfig()
  652. if err != nil {
  653. return nil, err
  654. }
  655. cfg.TLS = tls
  656. }
  657. if m.DialOptions != nil {
  658. cfg.DialOptions = append(cfg.DialOptions, m.DialOptions...)
  659. }
  660. return newClientV3(cfg)
  661. }
  662. // Clone returns a member with the same server configuration. The returned
  663. // member will not set PeerListeners and ClientListeners.
  664. func (m *member) Clone(t testing.TB) *member {
  665. mm := &member{}
  666. mm.ServerConfig = m.ServerConfig
  667. var err error
  668. clientURLStrs := m.ClientURLs.StringSlice()
  669. mm.ClientURLs, err = types.NewURLs(clientURLStrs)
  670. if err != nil {
  671. // this should never fail
  672. panic(err)
  673. }
  674. peerURLStrs := m.PeerURLs.StringSlice()
  675. mm.PeerURLs, err = types.NewURLs(peerURLStrs)
  676. if err != nil {
  677. // this should never fail
  678. panic(err)
  679. }
  680. clusterStr := m.InitialPeerURLsMap.String()
  681. mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  682. if err != nil {
  683. // this should never fail
  684. panic(err)
  685. }
  686. mm.InitialClusterToken = m.InitialClusterToken
  687. mm.ElectionTicks = m.ElectionTicks
  688. mm.PeerTLSInfo = m.PeerTLSInfo
  689. mm.ClientTLSInfo = m.ClientTLSInfo
  690. return mm
  691. }
  692. // Launch starts a member based on ServerConfig, PeerListeners
  693. // and ClientListeners.
  694. func (m *member) Launch() error {
  695. lg.Info(
  696. "launching a member",
  697. zap.String("name", m.Name),
  698. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  699. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  700. zap.String("grpc-address", m.grpcAddr),
  701. )
  702. var err error
  703. if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
  704. return fmt.Errorf("failed to initialize the etcd server: %v", err)
  705. }
  706. m.s.SyncTicker = time.NewTicker(500 * time.Millisecond)
  707. m.s.Start()
  708. var peerTLScfg *tls.Config
  709. if m.PeerTLSInfo != nil && !m.PeerTLSInfo.Empty() {
  710. if peerTLScfg, err = m.PeerTLSInfo.ServerConfig(); err != nil {
  711. return err
  712. }
  713. }
  714. if m.grpcListener != nil {
  715. var (
  716. tlscfg *tls.Config
  717. )
  718. if m.ClientTLSInfo != nil && !m.ClientTLSInfo.Empty() {
  719. tlscfg, err = m.ClientTLSInfo.ServerConfig()
  720. if err != nil {
  721. return err
  722. }
  723. }
  724. m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...)
  725. m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg)
  726. m.serverClient = v3client.New(m.s)
  727. lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient))
  728. epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient))
  729. go m.grpcServer.Serve(m.grpcListener)
  730. }
  731. m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.Logger, m.s)}
  732. h := (http.Handler)(m.raftHandler)
  733. if m.grpcListener != nil {
  734. h = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  735. if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
  736. m.grpcServerPeer.ServeHTTP(w, r)
  737. } else {
  738. m.raftHandler.ServeHTTP(w, r)
  739. }
  740. })
  741. }
  742. for _, ln := range m.PeerListeners {
  743. cm := cmux.New(ln)
  744. // don't hang on matcher after closing listener
  745. cm.SetReadTimeout(time.Second)
  746. if m.grpcServer != nil {
  747. grpcl := cm.Match(cmux.HTTP2())
  748. go m.grpcServerPeer.Serve(grpcl)
  749. }
  750. // serve http1/http2 rafthttp/grpc
  751. ll := cm.Match(cmux.Any())
  752. if peerTLScfg != nil {
  753. if ll, err = transport.NewTLSListener(ll, m.PeerTLSInfo); err != nil {
  754. return err
  755. }
  756. }
  757. hs := &httptest.Server{
  758. Listener: ll,
  759. Config: &http.Server{
  760. Handler: h,
  761. TLSConfig: peerTLScfg,
  762. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  763. },
  764. TLS: peerTLScfg,
  765. }
  766. hs.Start()
  767. donec := make(chan struct{})
  768. go func() {
  769. defer close(donec)
  770. cm.Serve()
  771. }()
  772. closer := func() {
  773. ll.Close()
  774. hs.CloseClientConnections()
  775. hs.Close()
  776. <-donec
  777. }
  778. m.serverClosers = append(m.serverClosers, closer)
  779. }
  780. for _, ln := range m.ClientListeners {
  781. hs := &httptest.Server{
  782. Listener: ln,
  783. Config: &http.Server{
  784. Handler: v2http.NewClientHandler(
  785. m.Logger,
  786. m.s,
  787. m.ServerConfig.ReqTimeout(),
  788. ),
  789. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  790. },
  791. }
  792. if m.ClientTLSInfo == nil {
  793. hs.Start()
  794. } else {
  795. info := m.ClientTLSInfo
  796. hs.TLS, err = info.ServerConfig()
  797. if err != nil {
  798. return err
  799. }
  800. // baseConfig is called on initial TLS handshake start.
  801. //
  802. // Previously,
  803. // 1. Server has non-empty (*tls.Config).Certificates on client hello
  804. // 2. Server calls (*tls.Config).GetCertificate iff:
  805. // - Server's (*tls.Config).Certificates is not empty, or
  806. // - Client supplies SNI; non-empty (*tls.ClientHelloInfo).ServerName
  807. //
  808. // When (*tls.Config).Certificates is always populated on initial handshake,
  809. // client is expected to provide a valid matching SNI to pass the TLS
  810. // verification, thus trigger server (*tls.Config).GetCertificate to reload
  811. // TLS assets. However, a cert whose SAN field does not include domain names
  812. // but only IP addresses, has empty (*tls.ClientHelloInfo).ServerName, thus
  813. // it was never able to trigger TLS reload on initial handshake; first
  814. // ceritifcate object was being used, never being updated.
  815. //
  816. // Now, (*tls.Config).Certificates is created empty on initial TLS client
  817. // handshake, in order to trigger (*tls.Config).GetCertificate and populate
  818. // rest of the certificates on every new TLS connection, even when client
  819. // SNI is empty (e.g. cert only includes IPs).
  820. //
  821. // This introduces another problem with "httptest.Server":
  822. // when server initial certificates are empty, certificates
  823. // are overwritten by Go's internal test certs, which have
  824. // different SAN fields (e.g. example.com). To work around,
  825. // re-overwrite (*tls.Config).Certificates before starting
  826. // test server.
  827. tlsCert, err := tlsutil.NewCert(info.CertFile, info.KeyFile, nil)
  828. if err != nil {
  829. return err
  830. }
  831. hs.TLS.Certificates = []tls.Certificate{*tlsCert}
  832. hs.StartTLS()
  833. }
  834. closer := func() {
  835. ln.Close()
  836. hs.CloseClientConnections()
  837. hs.Close()
  838. }
  839. m.serverClosers = append(m.serverClosers, closer)
  840. }
  841. lg.Info(
  842. "launched a member",
  843. zap.String("name", m.Name),
  844. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  845. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  846. zap.String("grpc-address", m.grpcAddr),
  847. )
  848. return nil
  849. }
  850. func (m *member) WaitOK(t testing.TB) {
  851. m.WaitStarted(t)
  852. for m.s.Leader() == 0 {
  853. time.Sleep(tickDuration)
  854. }
  855. }
  856. func (m *member) WaitStarted(t testing.TB) {
  857. cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
  858. kapi := client.NewKeysAPI(cc)
  859. for {
  860. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  861. _, err := kapi.Get(ctx, "/", nil)
  862. if err != nil {
  863. time.Sleep(tickDuration)
  864. continue
  865. }
  866. cancel()
  867. break
  868. }
  869. }
  870. func WaitClientV3(t testing.TB, kv clientv3.KV) {
  871. timeout := time.Now().Add(requestTimeout)
  872. var err error
  873. for time.Now().Before(timeout) {
  874. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  875. _, err = kv.Get(ctx, "/")
  876. cancel()
  877. if err == nil {
  878. return
  879. }
  880. time.Sleep(tickDuration)
  881. }
  882. if err != nil {
  883. t.Fatalf("timed out waiting for client: %v", err)
  884. }
  885. }
  886. func (m *member) URL() string { return m.ClientURLs[0].String() }
  887. func (m *member) Pause() {
  888. m.raftHandler.Pause()
  889. m.s.PauseSending()
  890. }
  891. func (m *member) Resume() {
  892. m.raftHandler.Resume()
  893. m.s.ResumeSending()
  894. }
  895. // Close stops the member's etcdserver and closes its connections
  896. func (m *member) Close() {
  897. if m.grpcBridge != nil {
  898. m.grpcBridge.Close()
  899. m.grpcBridge = nil
  900. }
  901. if m.serverClient != nil {
  902. m.serverClient.Close()
  903. m.serverClient = nil
  904. }
  905. if m.grpcServer != nil {
  906. m.grpcServer.Stop()
  907. m.grpcServer.GracefulStop()
  908. m.grpcServer = nil
  909. m.grpcServerPeer.Stop()
  910. m.grpcServerPeer.GracefulStop()
  911. m.grpcServerPeer = nil
  912. }
  913. m.s.HardStop()
  914. for _, f := range m.serverClosers {
  915. f()
  916. }
  917. }
  918. // Stop stops the member, but the data dir of the member is preserved.
  919. func (m *member) Stop(t testing.TB) {
  920. lg.Info(
  921. "stopping a member",
  922. zap.String("name", m.Name),
  923. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  924. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  925. zap.String("grpc-address", m.grpcAddr),
  926. )
  927. m.Close()
  928. m.serverClosers = nil
  929. lg.Info(
  930. "stopped a member",
  931. zap.String("name", m.Name),
  932. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  933. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  934. zap.String("grpc-address", m.grpcAddr),
  935. )
  936. }
  937. // checkLeaderTransition waits for leader transition, returning the new leader ID.
  938. func checkLeaderTransition(m *member, oldLead uint64) uint64 {
  939. interval := time.Duration(m.s.Cfg.TickMs) * time.Millisecond
  940. for m.s.Lead() == 0 || (m.s.Lead() == oldLead) {
  941. time.Sleep(interval)
  942. }
  943. return m.s.Lead()
  944. }
  945. // StopNotify unblocks when a member stop completes
  946. func (m *member) StopNotify() <-chan struct{} {
  947. return m.s.StopNotify()
  948. }
  949. // Restart starts the member using the preserved data dir.
  950. func (m *member) Restart(t testing.TB) error {
  951. lg.Info(
  952. "restarting a member",
  953. zap.String("name", m.Name),
  954. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  955. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  956. zap.String("grpc-address", m.grpcAddr),
  957. )
  958. newPeerListeners := make([]net.Listener, 0)
  959. for _, ln := range m.PeerListeners {
  960. newPeerListeners = append(newPeerListeners, NewListenerWithAddr(t, ln.Addr().String()))
  961. }
  962. m.PeerListeners = newPeerListeners
  963. newClientListeners := make([]net.Listener, 0)
  964. for _, ln := range m.ClientListeners {
  965. newClientListeners = append(newClientListeners, NewListenerWithAddr(t, ln.Addr().String()))
  966. }
  967. m.ClientListeners = newClientListeners
  968. if m.grpcListener != nil {
  969. if err := m.listenGRPC(); err != nil {
  970. t.Fatal(err)
  971. }
  972. }
  973. err := m.Launch()
  974. lg.Info(
  975. "restarted a member",
  976. zap.String("name", m.Name),
  977. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  978. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  979. zap.String("grpc-address", m.grpcAddr),
  980. zap.Error(err),
  981. )
  982. return err
  983. }
  984. // Terminate stops the member and removes the data dir.
  985. func (m *member) Terminate(t testing.TB) {
  986. lg.Info(
  987. "terminating a member",
  988. zap.String("name", m.Name),
  989. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  990. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  991. zap.String("grpc-address", m.grpcAddr),
  992. )
  993. m.Close()
  994. if !m.keepDataDirTerminate {
  995. if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
  996. t.Fatal(err)
  997. }
  998. }
  999. lg.Info(
  1000. "terminated a member",
  1001. zap.String("name", m.Name),
  1002. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  1003. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  1004. zap.String("grpc-address", m.grpcAddr),
  1005. )
  1006. }
  1007. // Metric gets the metric value for a member
  1008. func (m *member) Metric(metricName string) (string, error) {
  1009. cfgtls := transport.TLSInfo{}
  1010. tr, err := transport.NewTimeoutTransport(cfgtls, time.Second, time.Second, time.Second)
  1011. if err != nil {
  1012. return "", err
  1013. }
  1014. cli := &http.Client{Transport: tr}
  1015. resp, err := cli.Get(m.ClientURLs[0].String() + "/metrics")
  1016. if err != nil {
  1017. return "", err
  1018. }
  1019. defer resp.Body.Close()
  1020. b, rerr := ioutil.ReadAll(resp.Body)
  1021. if rerr != nil {
  1022. return "", rerr
  1023. }
  1024. lines := strings.Split(string(b), "\n")
  1025. for _, l := range lines {
  1026. if strings.HasPrefix(l, metricName) {
  1027. return strings.Split(l, " ")[1], nil
  1028. }
  1029. }
  1030. return "", nil
  1031. }
  1032. // InjectPartition drops connections from m to others, vice versa.
  1033. func (m *member) InjectPartition(t testing.TB, others ...*member) {
  1034. for _, other := range others {
  1035. m.s.CutPeer(other.s.ID())
  1036. other.s.CutPeer(m.s.ID())
  1037. }
  1038. }
  1039. // RecoverPartition recovers connections from m to others, vice versa.
  1040. func (m *member) RecoverPartition(t testing.TB, others ...*member) {
  1041. for _, other := range others {
  1042. m.s.MendPeer(other.s.ID())
  1043. other.s.MendPeer(m.s.ID())
  1044. }
  1045. }
  1046. func MustNewHTTPClient(t testing.TB, eps []string, tls *transport.TLSInfo) client.Client {
  1047. cfgtls := transport.TLSInfo{}
  1048. if tls != nil {
  1049. cfgtls = *tls
  1050. }
  1051. cfg := client.Config{Transport: mustNewTransport(t, cfgtls), Endpoints: eps}
  1052. c, err := client.New(cfg)
  1053. if err != nil {
  1054. t.Fatal(err)
  1055. }
  1056. return c
  1057. }
  1058. func mustNewTransport(t testing.TB, tlsInfo transport.TLSInfo) *http.Transport {
  1059. // tick in integration test is short, so 1s dial timeout could play well.
  1060. tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
  1061. if err != nil {
  1062. t.Fatal(err)
  1063. }
  1064. return tr
  1065. }
  1066. type SortableMemberSliceByPeerURLs []client.Member
  1067. func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
  1068. func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {
  1069. return p[i].PeerURLs[0] < p[j].PeerURLs[0]
  1070. }
  1071. func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  1072. type ClusterV3 struct {
  1073. *cluster
  1074. mu sync.Mutex
  1075. clients []*clientv3.Client
  1076. }
  1077. // NewClusterV3 returns a launched cluster with a grpc client connection
  1078. // for each cluster member.
  1079. func NewClusterV3(t testing.TB, cfg *ClusterConfig) *ClusterV3 {
  1080. cfg.UseGRPC = true
  1081. if os.Getenv("CLIENT_DEBUG") != "" {
  1082. clientv3.SetLogger(grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 4))
  1083. }
  1084. clus := &ClusterV3{
  1085. cluster: NewClusterByConfig(t, cfg),
  1086. }
  1087. clus.Launch(t)
  1088. if !cfg.SkipCreatingClient {
  1089. for _, m := range clus.Members {
  1090. client, err := NewClientV3(m)
  1091. if err != nil {
  1092. t.Fatalf("cannot create client: %v", err)
  1093. }
  1094. clus.clients = append(clus.clients, client)
  1095. }
  1096. }
  1097. return clus
  1098. }
  1099. func (c *ClusterV3) TakeClient(idx int) {
  1100. c.mu.Lock()
  1101. c.clients[idx] = nil
  1102. c.mu.Unlock()
  1103. }
  1104. func (c *ClusterV3) Terminate(t testing.TB) {
  1105. c.mu.Lock()
  1106. for _, client := range c.clients {
  1107. if client == nil {
  1108. continue
  1109. }
  1110. if err := client.Close(); err != nil {
  1111. t.Error(err)
  1112. }
  1113. }
  1114. c.mu.Unlock()
  1115. c.cluster.Terminate(t)
  1116. }
  1117. func (c *ClusterV3) RandClient() *clientv3.Client {
  1118. return c.clients[rand.Intn(len(c.clients))]
  1119. }
  1120. func (c *ClusterV3) Client(i int) *clientv3.Client {
  1121. return c.clients[i]
  1122. }
  1123. type grpcAPI struct {
  1124. // Cluster is the cluster API for the client's connection.
  1125. Cluster pb.ClusterClient
  1126. // KV is the keyvalue API for the client's connection.
  1127. KV pb.KVClient
  1128. // Lease is the lease API for the client's connection.
  1129. Lease pb.LeaseClient
  1130. // Watch is the watch API for the client's connection.
  1131. Watch pb.WatchClient
  1132. // Maintenance is the maintenance API for the client's connection.
  1133. Maintenance pb.MaintenanceClient
  1134. // Auth is the authentication API for the client's connection.
  1135. Auth pb.AuthClient
  1136. // Lock is the lock API for the client's connection.
  1137. Lock lockpb.LockClient
  1138. // Election is the election API for the client's connection.
  1139. Election epb.ElectionClient
  1140. }