cluster.go 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "fmt"
  19. "io/ioutil"
  20. "log"
  21. "math/rand"
  22. "net"
  23. "net/http"
  24. "net/http/httptest"
  25. "os"
  26. "reflect"
  27. "sort"
  28. "strings"
  29. "sync"
  30. "sync/atomic"
  31. "testing"
  32. "time"
  33. "github.com/coreos/etcd/client"
  34. "github.com/coreos/etcd/clientv3"
  35. "github.com/coreos/etcd/embed"
  36. "github.com/coreos/etcd/etcdserver"
  37. "github.com/coreos/etcd/etcdserver/api/etcdhttp"
  38. "github.com/coreos/etcd/etcdserver/api/rafthttp"
  39. "github.com/coreos/etcd/etcdserver/api/v2http"
  40. "github.com/coreos/etcd/etcdserver/api/v3client"
  41. "github.com/coreos/etcd/etcdserver/api/v3election"
  42. epb "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
  43. "github.com/coreos/etcd/etcdserver/api/v3lock"
  44. lockpb "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
  45. "github.com/coreos/etcd/etcdserver/api/v3rpc"
  46. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  47. "github.com/coreos/etcd/pkg/testutil"
  48. "github.com/coreos/etcd/pkg/tlsutil"
  49. "github.com/coreos/etcd/pkg/transport"
  50. "github.com/coreos/etcd/pkg/types"
  51. "github.com/soheilhy/cmux"
  52. "go.uber.org/zap"
  53. "golang.org/x/crypto/bcrypt"
  54. "google.golang.org/grpc"
  55. "google.golang.org/grpc/grpclog"
  56. "google.golang.org/grpc/keepalive"
  57. )
  58. const (
  59. // RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss.
  60. RequestWaitTimeout = 3 * time.Second
  61. tickDuration = 10 * time.Millisecond
  62. requestTimeout = 20 * time.Second
  63. clusterName = "etcd"
  64. basePort = 21000
  65. UrlScheme = "unix"
  66. UrlSchemeTLS = "unixs"
  67. )
  68. var (
  69. electionTicks = 10
  70. // integration test uses unique ports, counting up, to listen for each
  71. // member, ensuring restarted members can listen on the same port again.
  72. localListenCount int64 = 0
  73. testTLSInfo = transport.TLSInfo{
  74. KeyFile: "./fixtures/server.key.insecure",
  75. CertFile: "./fixtures/server.crt",
  76. TrustedCAFile: "./fixtures/ca.crt",
  77. ClientCertAuth: true,
  78. }
  79. testTLSInfoIP = transport.TLSInfo{
  80. KeyFile: "./fixtures/server-ip.key.insecure",
  81. CertFile: "./fixtures/server-ip.crt",
  82. TrustedCAFile: "./fixtures/ca.crt",
  83. ClientCertAuth: true,
  84. }
  85. testTLSInfoExpired = transport.TLSInfo{
  86. KeyFile: "./fixtures-expired/server.key.insecure",
  87. CertFile: "./fixtures-expired/server.crt",
  88. TrustedCAFile: "./fixtures-expired/ca.crt",
  89. ClientCertAuth: true,
  90. }
  91. testTLSInfoExpiredIP = transport.TLSInfo{
  92. KeyFile: "./fixtures-expired/server-ip.key.insecure",
  93. CertFile: "./fixtures-expired/server-ip.crt",
  94. TrustedCAFile: "./fixtures-expired/ca.crt",
  95. ClientCertAuth: true,
  96. }
  97. defaultTokenJWT = "jwt,pub-key=./fixtures/server.crt,priv-key=./fixtures/server.key.insecure,sign-method=RS256,ttl=1s"
  98. lg = zap.NewNop()
  99. )
  100. func init() {
  101. if os.Getenv("CLUSTER_DEBUG") != "" {
  102. lg, _ = zap.NewProduction()
  103. }
  104. }
  105. type ClusterConfig struct {
  106. Size int
  107. PeerTLS *transport.TLSInfo
  108. ClientTLS *transport.TLSInfo
  109. DiscoveryURL string
  110. AuthToken string
  111. UseGRPC bool
  112. QuotaBackendBytes int64
  113. MaxTxnOps uint
  114. MaxRequestBytes uint
  115. SnapshotCount uint64
  116. SnapshotCatchUpEntries uint64
  117. GRPCKeepAliveMinTime time.Duration
  118. GRPCKeepAliveInterval time.Duration
  119. GRPCKeepAliveTimeout time.Duration
  120. // SkipCreatingClient to skip creating clients for each member.
  121. SkipCreatingClient bool
  122. ClientMaxCallSendMsgSize int
  123. ClientMaxCallRecvMsgSize int
  124. // UseIP is true to use only IP for gRPC requests.
  125. UseIP bool
  126. }
  127. type cluster struct {
  128. cfg *ClusterConfig
  129. Members []*member
  130. }
  131. func schemeFromTLSInfo(tls *transport.TLSInfo) string {
  132. if tls == nil {
  133. return UrlScheme
  134. }
  135. return UrlSchemeTLS
  136. }
  137. func (c *cluster) fillClusterForMembers() error {
  138. if c.cfg.DiscoveryURL != "" {
  139. // cluster will be discovered
  140. return nil
  141. }
  142. addrs := make([]string, 0)
  143. for _, m := range c.Members {
  144. scheme := schemeFromTLSInfo(m.PeerTLSInfo)
  145. for _, l := range m.PeerListeners {
  146. addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String()))
  147. }
  148. }
  149. clusterStr := strings.Join(addrs, ",")
  150. var err error
  151. for _, m := range c.Members {
  152. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  153. if err != nil {
  154. return err
  155. }
  156. }
  157. return nil
  158. }
  159. func newCluster(t *testing.T, cfg *ClusterConfig) *cluster {
  160. c := &cluster{cfg: cfg}
  161. ms := make([]*member, cfg.Size)
  162. for i := 0; i < cfg.Size; i++ {
  163. ms[i] = c.mustNewMember(t)
  164. }
  165. c.Members = ms
  166. if err := c.fillClusterForMembers(); err != nil {
  167. t.Fatal(err)
  168. }
  169. return c
  170. }
  171. // NewCluster returns an unlaunched cluster of the given size which has been
  172. // set to use static bootstrap.
  173. func NewCluster(t *testing.T, size int) *cluster {
  174. return newCluster(t, &ClusterConfig{Size: size})
  175. }
  176. // NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration
  177. func NewClusterByConfig(t *testing.T, cfg *ClusterConfig) *cluster {
  178. return newCluster(t, cfg)
  179. }
  180. func (c *cluster) Launch(t *testing.T) {
  181. errc := make(chan error)
  182. for _, m := range c.Members {
  183. // Members are launched in separate goroutines because if they boot
  184. // using discovery url, they have to wait for others to register to continue.
  185. go func(m *member) {
  186. errc <- m.Launch()
  187. }(m)
  188. }
  189. for range c.Members {
  190. if err := <-errc; err != nil {
  191. t.Fatalf("error setting up member: %v", err)
  192. }
  193. }
  194. // wait cluster to be stable to receive future client requests
  195. c.waitMembersMatch(t, c.HTTPMembers())
  196. c.waitVersion()
  197. }
  198. func (c *cluster) URL(i int) string {
  199. return c.Members[i].ClientURLs[0].String()
  200. }
  201. // URLs returns a list of all active client URLs in the cluster
  202. func (c *cluster) URLs() []string {
  203. return getMembersURLs(c.Members)
  204. }
  205. func getMembersURLs(members []*member) []string {
  206. urls := make([]string, 0)
  207. for _, m := range members {
  208. select {
  209. case <-m.s.StopNotify():
  210. continue
  211. default:
  212. }
  213. for _, u := range m.ClientURLs {
  214. urls = append(urls, u.String())
  215. }
  216. }
  217. return urls
  218. }
  219. // HTTPMembers returns a list of all active members as client.Members
  220. func (c *cluster) HTTPMembers() []client.Member {
  221. ms := []client.Member{}
  222. for _, m := range c.Members {
  223. pScheme := schemeFromTLSInfo(m.PeerTLSInfo)
  224. cScheme := schemeFromTLSInfo(m.ClientTLSInfo)
  225. cm := client.Member{Name: m.Name}
  226. for _, ln := range m.PeerListeners {
  227. cm.PeerURLs = append(cm.PeerURLs, pScheme+"://"+ln.Addr().String())
  228. }
  229. for _, ln := range m.ClientListeners {
  230. cm.ClientURLs = append(cm.ClientURLs, cScheme+"://"+ln.Addr().String())
  231. }
  232. ms = append(ms, cm)
  233. }
  234. return ms
  235. }
  236. func (c *cluster) mustNewMember(t *testing.T) *member {
  237. m := mustNewMember(t,
  238. memberConfig{
  239. name: c.name(rand.Int()),
  240. authToken: c.cfg.AuthToken,
  241. peerTLS: c.cfg.PeerTLS,
  242. clientTLS: c.cfg.ClientTLS,
  243. quotaBackendBytes: c.cfg.QuotaBackendBytes,
  244. maxTxnOps: c.cfg.MaxTxnOps,
  245. maxRequestBytes: c.cfg.MaxRequestBytes,
  246. snapshotCount: c.cfg.SnapshotCount,
  247. snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries,
  248. grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime,
  249. grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
  250. grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout,
  251. clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
  252. clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
  253. useIP: c.cfg.UseIP,
  254. })
  255. m.DiscoveryURL = c.cfg.DiscoveryURL
  256. if c.cfg.UseGRPC {
  257. if err := m.listenGRPC(); err != nil {
  258. t.Fatal(err)
  259. }
  260. }
  261. return m
  262. }
  263. func (c *cluster) addMember(t *testing.T) {
  264. m := c.mustNewMember(t)
  265. scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
  266. // send add request to the cluster
  267. var err error
  268. for i := 0; i < len(c.Members); i++ {
  269. clientURL := c.URL(i)
  270. peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
  271. if err = c.addMemberByURL(t, clientURL, peerURL); err == nil {
  272. break
  273. }
  274. }
  275. if err != nil {
  276. t.Fatalf("add member failed on all members error: %v", err)
  277. }
  278. m.InitialPeerURLsMap = types.URLsMap{}
  279. for _, mm := range c.Members {
  280. m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
  281. }
  282. m.InitialPeerURLsMap[m.Name] = m.PeerURLs
  283. m.NewCluster = false
  284. if err := m.Launch(); err != nil {
  285. t.Fatal(err)
  286. }
  287. c.Members = append(c.Members, m)
  288. // wait cluster to be stable to receive future client requests
  289. c.waitMembersMatch(t, c.HTTPMembers())
  290. }
  291. func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error {
  292. cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
  293. ma := client.NewMembersAPI(cc)
  294. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  295. _, err := ma.Add(ctx, peerURL)
  296. cancel()
  297. if err != nil {
  298. return err
  299. }
  300. // wait for the add node entry applied in the cluster
  301. members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
  302. c.waitMembersMatch(t, members)
  303. return nil
  304. }
  305. func (c *cluster) AddMember(t *testing.T) {
  306. c.addMember(t)
  307. }
  308. func (c *cluster) RemoveMember(t *testing.T, id uint64) {
  309. if err := c.removeMember(t, id); err != nil {
  310. t.Fatal(err)
  311. }
  312. }
  313. func (c *cluster) removeMember(t *testing.T, id uint64) error {
  314. // send remove request to the cluster
  315. cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
  316. ma := client.NewMembersAPI(cc)
  317. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  318. err := ma.Remove(ctx, types.ID(id).String())
  319. cancel()
  320. if err != nil {
  321. return err
  322. }
  323. newMembers := make([]*member, 0)
  324. for _, m := range c.Members {
  325. if uint64(m.s.ID()) != id {
  326. newMembers = append(newMembers, m)
  327. } else {
  328. select {
  329. case <-m.s.StopNotify():
  330. m.Terminate(t)
  331. // 1s stop delay + election timeout + 1s disk and network delay + connection write timeout
  332. // TODO: remove connection write timeout by selecting on http response closeNotifier
  333. // blocking on https://github.com/golang/go/issues/9524
  334. case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout):
  335. t.Fatalf("failed to remove member %s in time", m.s.ID())
  336. }
  337. }
  338. }
  339. c.Members = newMembers
  340. c.waitMembersMatch(t, c.HTTPMembers())
  341. return nil
  342. }
  343. func (c *cluster) Terminate(t *testing.T) {
  344. var wg sync.WaitGroup
  345. wg.Add(len(c.Members))
  346. for _, m := range c.Members {
  347. go func(mm *member) {
  348. defer wg.Done()
  349. mm.Terminate(t)
  350. }(m)
  351. }
  352. wg.Wait()
  353. }
  354. func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
  355. for _, u := range c.URLs() {
  356. cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
  357. ma := client.NewMembersAPI(cc)
  358. for {
  359. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  360. ms, err := ma.List(ctx)
  361. cancel()
  362. if err == nil && isMembersEqual(ms, membs) {
  363. break
  364. }
  365. time.Sleep(tickDuration)
  366. }
  367. }
  368. }
  369. func (c *cluster) WaitLeader(t *testing.T) int { return c.waitLeader(t, c.Members) }
  370. // waitLeader waits until given members agree on the same leader.
  371. func (c *cluster) waitLeader(t *testing.T, membs []*member) int {
  372. possibleLead := make(map[uint64]bool)
  373. var lead uint64
  374. for _, m := range membs {
  375. possibleLead[uint64(m.s.ID())] = true
  376. }
  377. cc := MustNewHTTPClient(t, getMembersURLs(membs), nil)
  378. kapi := client.NewKeysAPI(cc)
  379. // ensure leader is up via linearizable get
  380. for {
  381. ctx, cancel := context.WithTimeout(context.Background(), 10*tickDuration+time.Second)
  382. _, err := kapi.Get(ctx, "0", &client.GetOptions{Quorum: true})
  383. cancel()
  384. if err == nil || strings.Contains(err.Error(), "Key not found") {
  385. break
  386. }
  387. }
  388. for lead == 0 || !possibleLead[lead] {
  389. lead = 0
  390. for _, m := range membs {
  391. select {
  392. case <-m.s.StopNotify():
  393. continue
  394. default:
  395. }
  396. if lead != 0 && lead != m.s.Lead() {
  397. lead = 0
  398. time.Sleep(10 * tickDuration)
  399. break
  400. }
  401. lead = m.s.Lead()
  402. }
  403. }
  404. for i, m := range membs {
  405. if uint64(m.s.ID()) == lead {
  406. return i
  407. }
  408. }
  409. return -1
  410. }
  411. func (c *cluster) WaitNoLeader() { c.waitNoLeader(c.Members) }
  412. // waitNoLeader waits until given members lose leader.
  413. func (c *cluster) waitNoLeader(membs []*member) {
  414. noLeader := false
  415. for !noLeader {
  416. noLeader = true
  417. for _, m := range membs {
  418. select {
  419. case <-m.s.StopNotify():
  420. continue
  421. default:
  422. }
  423. if m.s.Lead() != 0 {
  424. noLeader = false
  425. time.Sleep(10 * tickDuration)
  426. break
  427. }
  428. }
  429. }
  430. }
  431. func (c *cluster) waitVersion() {
  432. for _, m := range c.Members {
  433. for {
  434. if m.s.ClusterVersion() != nil {
  435. break
  436. }
  437. time.Sleep(tickDuration)
  438. }
  439. }
  440. }
  441. func (c *cluster) name(i int) string {
  442. return fmt.Sprint(i)
  443. }
  444. // isMembersEqual checks whether two members equal except ID field.
  445. // The given wmembs should always set ID field to empty string.
  446. func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
  447. sort.Sort(SortableMemberSliceByPeerURLs(membs))
  448. sort.Sort(SortableMemberSliceByPeerURLs(wmembs))
  449. for i := range membs {
  450. membs[i].ID = ""
  451. }
  452. return reflect.DeepEqual(membs, wmembs)
  453. }
  454. func newLocalListener(t *testing.T) net.Listener {
  455. c := atomic.AddInt64(&localListenCount, 1)
  456. // Go 1.8+ allows only numbers in port
  457. addr := fmt.Sprintf("127.0.0.1:%05d%05d", c+basePort, os.Getpid())
  458. return NewListenerWithAddr(t, addr)
  459. }
  460. func NewListenerWithAddr(t *testing.T, addr string) net.Listener {
  461. l, err := transport.NewUnixListener(addr)
  462. if err != nil {
  463. t.Fatal(err)
  464. }
  465. return l
  466. }
  467. type member struct {
  468. etcdserver.ServerConfig
  469. PeerListeners, ClientListeners []net.Listener
  470. grpcListener net.Listener
  471. // PeerTLSInfo enables peer TLS when set
  472. PeerTLSInfo *transport.TLSInfo
  473. // ClientTLSInfo enables client TLS when set
  474. ClientTLSInfo *transport.TLSInfo
  475. DialOptions []grpc.DialOption
  476. raftHandler *testutil.PauseableHandler
  477. s *etcdserver.EtcdServer
  478. serverClosers []func()
  479. grpcServerOpts []grpc.ServerOption
  480. grpcServer *grpc.Server
  481. grpcServerPeer *grpc.Server
  482. grpcAddr string
  483. grpcBridge *bridge
  484. // serverClient is a clientv3 that directly calls the etcdserver.
  485. serverClient *clientv3.Client
  486. keepDataDirTerminate bool
  487. clientMaxCallSendMsgSize int
  488. clientMaxCallRecvMsgSize int
  489. useIP bool
  490. }
  491. func (m *member) GRPCAddr() string { return m.grpcAddr }
  492. type memberConfig struct {
  493. name string
  494. peerTLS *transport.TLSInfo
  495. clientTLS *transport.TLSInfo
  496. authToken string
  497. quotaBackendBytes int64
  498. maxTxnOps uint
  499. maxRequestBytes uint
  500. snapshotCount uint64
  501. snapshotCatchUpEntries uint64
  502. grpcKeepAliveMinTime time.Duration
  503. grpcKeepAliveInterval time.Duration
  504. grpcKeepAliveTimeout time.Duration
  505. clientMaxCallSendMsgSize int
  506. clientMaxCallRecvMsgSize int
  507. useIP bool
  508. }
  509. // mustNewMember return an inited member with the given name. If peerTLS is
  510. // set, it will use https scheme to communicate between peers.
  511. func mustNewMember(t *testing.T, mcfg memberConfig) *member {
  512. var err error
  513. m := &member{}
  514. peerScheme := schemeFromTLSInfo(mcfg.peerTLS)
  515. clientScheme := schemeFromTLSInfo(mcfg.clientTLS)
  516. pln := newLocalListener(t)
  517. m.PeerListeners = []net.Listener{pln}
  518. m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()})
  519. if err != nil {
  520. t.Fatal(err)
  521. }
  522. m.PeerTLSInfo = mcfg.peerTLS
  523. cln := newLocalListener(t)
  524. m.ClientListeners = []net.Listener{cln}
  525. m.ClientURLs, err = types.NewURLs([]string{clientScheme + "://" + cln.Addr().String()})
  526. if err != nil {
  527. t.Fatal(err)
  528. }
  529. m.ClientTLSInfo = mcfg.clientTLS
  530. m.Name = mcfg.name
  531. m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
  532. if err != nil {
  533. t.Fatal(err)
  534. }
  535. clusterStr := fmt.Sprintf("%s=%s://%s", mcfg.name, peerScheme, pln.Addr().String())
  536. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  537. if err != nil {
  538. t.Fatal(err)
  539. }
  540. m.InitialClusterToken = clusterName
  541. m.NewCluster = true
  542. m.BootstrapTimeout = 10 * time.Millisecond
  543. if m.PeerTLSInfo != nil {
  544. m.ServerConfig.PeerTLSInfo = *m.PeerTLSInfo
  545. }
  546. m.ElectionTicks = electionTicks
  547. m.InitialElectionTickAdvance = true
  548. m.TickMs = uint(tickDuration / time.Millisecond)
  549. m.QuotaBackendBytes = mcfg.quotaBackendBytes
  550. m.MaxTxnOps = mcfg.maxTxnOps
  551. if m.MaxTxnOps == 0 {
  552. m.MaxTxnOps = embed.DefaultMaxTxnOps
  553. }
  554. m.MaxRequestBytes = mcfg.maxRequestBytes
  555. if m.MaxRequestBytes == 0 {
  556. m.MaxRequestBytes = embed.DefaultMaxRequestBytes
  557. }
  558. m.SnapshotCount = etcdserver.DefaultSnapshotCount
  559. if mcfg.snapshotCount != 0 {
  560. m.SnapshotCount = mcfg.snapshotCount
  561. }
  562. m.SnapshotCatchUpEntries = etcdserver.DefaultSnapshotCatchUpEntries
  563. if mcfg.snapshotCatchUpEntries != 0 {
  564. m.SnapshotCatchUpEntries = mcfg.snapshotCatchUpEntries
  565. }
  566. // for the purpose of integration testing, simple token is enough
  567. m.AuthToken = "simple"
  568. if mcfg.authToken != "" {
  569. m.AuthToken = mcfg.authToken
  570. }
  571. m.BcryptCost = uint(bcrypt.MinCost) // use min bcrypt cost to speedy up integration testing
  572. m.grpcServerOpts = []grpc.ServerOption{}
  573. if mcfg.grpcKeepAliveMinTime > time.Duration(0) {
  574. m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
  575. MinTime: mcfg.grpcKeepAliveMinTime,
  576. PermitWithoutStream: false,
  577. }))
  578. }
  579. if mcfg.grpcKeepAliveInterval > time.Duration(0) &&
  580. mcfg.grpcKeepAliveTimeout > time.Duration(0) {
  581. m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveParams(keepalive.ServerParameters{
  582. Time: mcfg.grpcKeepAliveInterval,
  583. Timeout: mcfg.grpcKeepAliveTimeout,
  584. }))
  585. }
  586. m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize
  587. m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize
  588. m.useIP = mcfg.useIP
  589. m.InitialCorruptCheck = true
  590. m.LoggerConfig = &zap.Config{
  591. Level: zap.NewAtomicLevelAt(zap.InfoLevel),
  592. Development: false,
  593. Sampling: &zap.SamplingConfig{
  594. Initial: 100,
  595. Thereafter: 100,
  596. },
  597. Encoding: "json",
  598. EncoderConfig: zap.NewProductionEncoderConfig(),
  599. OutputPaths: []string{"/dev/null"},
  600. ErrorOutputPaths: []string{"/dev/null"},
  601. }
  602. if os.Getenv("CLUSTER_DEBUG") != "" {
  603. m.LoggerConfig.OutputPaths = []string{"stderr"}
  604. m.LoggerConfig.ErrorOutputPaths = []string{"stderr"}
  605. }
  606. m.Logger, err = m.LoggerConfig.Build()
  607. if err != nil {
  608. t.Fatal(err)
  609. }
  610. return m
  611. }
  612. // listenGRPC starts a grpc server over a unix domain socket on the member
  613. func (m *member) listenGRPC() error {
  614. // prefix with localhost so cert has right domain
  615. m.grpcAddr = "localhost:" + m.Name
  616. if m.useIP { // for IP-only TLS certs
  617. m.grpcAddr = "127.0.0.1:" + m.Name
  618. }
  619. l, err := transport.NewUnixListener(m.grpcAddr)
  620. if err != nil {
  621. return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
  622. }
  623. m.grpcBridge, err = newBridge(m.grpcAddr)
  624. if err != nil {
  625. l.Close()
  626. return err
  627. }
  628. m.grpcAddr = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + m.grpcBridge.inaddr
  629. m.grpcListener = l
  630. return nil
  631. }
  632. func (m *member) ElectionTimeout() time.Duration {
  633. return time.Duration(m.s.Cfg.ElectionTicks*int(m.s.Cfg.TickMs)) * time.Millisecond
  634. }
  635. func (m *member) ID() types.ID { return m.s.ID() }
  636. func (m *member) DropConnections() { m.grpcBridge.Reset() }
  637. func (m *member) PauseConnections() { m.grpcBridge.Pause() }
  638. func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
  639. func (m *member) Blackhole() { m.grpcBridge.Blackhole() }
  640. func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() }
  641. // NewClientV3 creates a new grpc client connection to the member
  642. func NewClientV3(m *member) (*clientv3.Client, error) {
  643. if m.grpcAddr == "" {
  644. return nil, fmt.Errorf("member not configured for grpc")
  645. }
  646. cfg := clientv3.Config{
  647. Endpoints: []string{m.grpcAddr},
  648. DialTimeout: 5 * time.Second,
  649. DialOptions: []grpc.DialOption{grpc.WithBlock()},
  650. MaxCallSendMsgSize: m.clientMaxCallSendMsgSize,
  651. MaxCallRecvMsgSize: m.clientMaxCallRecvMsgSize,
  652. }
  653. if m.ClientTLSInfo != nil {
  654. tls, err := m.ClientTLSInfo.ClientConfig()
  655. if err != nil {
  656. return nil, err
  657. }
  658. cfg.TLS = tls
  659. }
  660. if m.DialOptions != nil {
  661. cfg.DialOptions = append(cfg.DialOptions, m.DialOptions...)
  662. }
  663. return newClientV3(cfg)
  664. }
  665. // Clone returns a member with the same server configuration. The returned
  666. // member will not set PeerListeners and ClientListeners.
  667. func (m *member) Clone(t *testing.T) *member {
  668. mm := &member{}
  669. mm.ServerConfig = m.ServerConfig
  670. var err error
  671. clientURLStrs := m.ClientURLs.StringSlice()
  672. mm.ClientURLs, err = types.NewURLs(clientURLStrs)
  673. if err != nil {
  674. // this should never fail
  675. panic(err)
  676. }
  677. peerURLStrs := m.PeerURLs.StringSlice()
  678. mm.PeerURLs, err = types.NewURLs(peerURLStrs)
  679. if err != nil {
  680. // this should never fail
  681. panic(err)
  682. }
  683. clusterStr := m.InitialPeerURLsMap.String()
  684. mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  685. if err != nil {
  686. // this should never fail
  687. panic(err)
  688. }
  689. mm.InitialClusterToken = m.InitialClusterToken
  690. mm.ElectionTicks = m.ElectionTicks
  691. mm.PeerTLSInfo = m.PeerTLSInfo
  692. mm.ClientTLSInfo = m.ClientTLSInfo
  693. return mm
  694. }
  695. // Launch starts a member based on ServerConfig, PeerListeners
  696. // and ClientListeners.
  697. func (m *member) Launch() error {
  698. lg.Info(
  699. "launching a member",
  700. zap.String("name", m.Name),
  701. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  702. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  703. zap.String("grpc-address", m.grpcAddr),
  704. )
  705. var err error
  706. if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
  707. return fmt.Errorf("failed to initialize the etcd server: %v", err)
  708. }
  709. m.s.SyncTicker = time.NewTicker(500 * time.Millisecond)
  710. m.s.Start()
  711. var peerTLScfg *tls.Config
  712. if m.PeerTLSInfo != nil && !m.PeerTLSInfo.Empty() {
  713. if peerTLScfg, err = m.PeerTLSInfo.ServerConfig(); err != nil {
  714. return err
  715. }
  716. }
  717. if m.grpcListener != nil {
  718. var (
  719. tlscfg *tls.Config
  720. )
  721. if m.ClientTLSInfo != nil && !m.ClientTLSInfo.Empty() {
  722. tlscfg, err = m.ClientTLSInfo.ServerConfig()
  723. if err != nil {
  724. return err
  725. }
  726. }
  727. m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...)
  728. m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg)
  729. m.serverClient = v3client.New(m.s)
  730. lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient))
  731. epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient))
  732. go m.grpcServer.Serve(m.grpcListener)
  733. }
  734. m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.Logger, m.s)}
  735. h := (http.Handler)(m.raftHandler)
  736. if m.grpcListener != nil {
  737. h = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  738. if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
  739. m.grpcServerPeer.ServeHTTP(w, r)
  740. } else {
  741. m.raftHandler.ServeHTTP(w, r)
  742. }
  743. })
  744. }
  745. for _, ln := range m.PeerListeners {
  746. cm := cmux.New(ln)
  747. // don't hang on matcher after closing listener
  748. cm.SetReadTimeout(time.Second)
  749. if m.grpcServer != nil {
  750. grpcl := cm.Match(cmux.HTTP2())
  751. go m.grpcServerPeer.Serve(grpcl)
  752. }
  753. // serve http1/http2 rafthttp/grpc
  754. ll := cm.Match(cmux.Any())
  755. if peerTLScfg != nil {
  756. if ll, err = transport.NewTLSListener(ll, m.PeerTLSInfo); err != nil {
  757. return err
  758. }
  759. }
  760. hs := &httptest.Server{
  761. Listener: ll,
  762. Config: &http.Server{
  763. Handler: h,
  764. TLSConfig: peerTLScfg,
  765. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  766. },
  767. TLS: peerTLScfg,
  768. }
  769. hs.Start()
  770. donec := make(chan struct{})
  771. go func() {
  772. defer close(donec)
  773. cm.Serve()
  774. }()
  775. closer := func() {
  776. ll.Close()
  777. hs.CloseClientConnections()
  778. hs.Close()
  779. <-donec
  780. }
  781. m.serverClosers = append(m.serverClosers, closer)
  782. }
  783. for _, ln := range m.ClientListeners {
  784. hs := &httptest.Server{
  785. Listener: ln,
  786. Config: &http.Server{
  787. Handler: v2http.NewClientHandler(
  788. m.Logger,
  789. m.s,
  790. m.ServerConfig.ReqTimeout(),
  791. ),
  792. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  793. },
  794. }
  795. if m.ClientTLSInfo == nil {
  796. hs.Start()
  797. } else {
  798. info := m.ClientTLSInfo
  799. hs.TLS, err = info.ServerConfig()
  800. if err != nil {
  801. return err
  802. }
  803. // baseConfig is called on initial TLS handshake start.
  804. //
  805. // Previously,
  806. // 1. Server has non-empty (*tls.Config).Certificates on client hello
  807. // 2. Server calls (*tls.Config).GetCertificate iff:
  808. // - Server's (*tls.Config).Certificates is not empty, or
  809. // - Client supplies SNI; non-empty (*tls.ClientHelloInfo).ServerName
  810. //
  811. // When (*tls.Config).Certificates is always populated on initial handshake,
  812. // client is expected to provide a valid matching SNI to pass the TLS
  813. // verification, thus trigger server (*tls.Config).GetCertificate to reload
  814. // TLS assets. However, a cert whose SAN field does not include domain names
  815. // but only IP addresses, has empty (*tls.ClientHelloInfo).ServerName, thus
  816. // it was never able to trigger TLS reload on initial handshake; first
  817. // ceritifcate object was being used, never being updated.
  818. //
  819. // Now, (*tls.Config).Certificates is created empty on initial TLS client
  820. // handshake, in order to trigger (*tls.Config).GetCertificate and populate
  821. // rest of the certificates on every new TLS connection, even when client
  822. // SNI is empty (e.g. cert only includes IPs).
  823. //
  824. // This introduces another problem with "httptest.Server":
  825. // when server initial certificates are empty, certificates
  826. // are overwritten by Go's internal test certs, which have
  827. // different SAN fields (e.g. example.com). To work around,
  828. // re-overwrite (*tls.Config).Certificates before starting
  829. // test server.
  830. tlsCert, err := tlsutil.NewCert(info.CertFile, info.KeyFile, nil)
  831. if err != nil {
  832. return err
  833. }
  834. hs.TLS.Certificates = []tls.Certificate{*tlsCert}
  835. hs.StartTLS()
  836. }
  837. closer := func() {
  838. ln.Close()
  839. hs.CloseClientConnections()
  840. hs.Close()
  841. }
  842. m.serverClosers = append(m.serverClosers, closer)
  843. }
  844. lg.Info(
  845. "launched a member",
  846. zap.String("name", m.Name),
  847. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  848. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  849. zap.String("grpc-address", m.grpcAddr),
  850. )
  851. return nil
  852. }
  853. func (m *member) WaitOK(t *testing.T) {
  854. m.WaitStarted(t)
  855. for m.s.Leader() == 0 {
  856. time.Sleep(tickDuration)
  857. }
  858. }
  859. func (m *member) WaitStarted(t *testing.T) {
  860. cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
  861. kapi := client.NewKeysAPI(cc)
  862. for {
  863. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  864. _, err := kapi.Get(ctx, "/", nil)
  865. if err != nil {
  866. time.Sleep(tickDuration)
  867. continue
  868. }
  869. cancel()
  870. break
  871. }
  872. }
  873. func WaitClientV3(t *testing.T, kv clientv3.KV) {
  874. timeout := time.Now().Add(requestTimeout)
  875. var err error
  876. for time.Now().Before(timeout) {
  877. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  878. _, err = kv.Get(ctx, "/")
  879. cancel()
  880. if err == nil {
  881. return
  882. }
  883. time.Sleep(tickDuration)
  884. }
  885. if err != nil {
  886. t.Fatalf("timed out waiting for client: %v", err)
  887. }
  888. }
  889. func (m *member) URL() string { return m.ClientURLs[0].String() }
  890. func (m *member) Pause() {
  891. m.raftHandler.Pause()
  892. m.s.PauseSending()
  893. }
  894. func (m *member) Resume() {
  895. m.raftHandler.Resume()
  896. m.s.ResumeSending()
  897. }
  898. // Close stops the member's etcdserver and closes its connections
  899. func (m *member) Close() {
  900. if m.grpcBridge != nil {
  901. m.grpcBridge.Close()
  902. m.grpcBridge = nil
  903. }
  904. if m.serverClient != nil {
  905. m.serverClient.Close()
  906. m.serverClient = nil
  907. }
  908. if m.grpcServer != nil {
  909. m.grpcServer.Stop()
  910. m.grpcServer.GracefulStop()
  911. m.grpcServer = nil
  912. m.grpcServerPeer.Stop()
  913. m.grpcServerPeer.GracefulStop()
  914. m.grpcServerPeer = nil
  915. }
  916. m.s.HardStop()
  917. for _, f := range m.serverClosers {
  918. f()
  919. }
  920. }
  921. // Stop stops the member, but the data dir of the member is preserved.
  922. func (m *member) Stop(t *testing.T) {
  923. lg.Info(
  924. "stopping a member",
  925. zap.String("name", m.Name),
  926. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  927. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  928. zap.String("grpc-address", m.grpcAddr),
  929. )
  930. m.Close()
  931. m.serverClosers = nil
  932. lg.Info(
  933. "stopped a member",
  934. zap.String("name", m.Name),
  935. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  936. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  937. zap.String("grpc-address", m.grpcAddr),
  938. )
  939. }
  940. // checkLeaderTransition waits for leader transition, returning the new leader ID.
  941. func checkLeaderTransition(m *member, oldLead uint64) uint64 {
  942. interval := time.Duration(m.s.Cfg.TickMs) * time.Millisecond
  943. for m.s.Lead() == 0 || (m.s.Lead() == oldLead) {
  944. time.Sleep(interval)
  945. }
  946. return m.s.Lead()
  947. }
  948. // StopNotify unblocks when a member stop completes
  949. func (m *member) StopNotify() <-chan struct{} {
  950. return m.s.StopNotify()
  951. }
  952. // Restart starts the member using the preserved data dir.
  953. func (m *member) Restart(t *testing.T) error {
  954. lg.Info(
  955. "restarting a member",
  956. zap.String("name", m.Name),
  957. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  958. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  959. zap.String("grpc-address", m.grpcAddr),
  960. )
  961. newPeerListeners := make([]net.Listener, 0)
  962. for _, ln := range m.PeerListeners {
  963. newPeerListeners = append(newPeerListeners, NewListenerWithAddr(t, ln.Addr().String()))
  964. }
  965. m.PeerListeners = newPeerListeners
  966. newClientListeners := make([]net.Listener, 0)
  967. for _, ln := range m.ClientListeners {
  968. newClientListeners = append(newClientListeners, NewListenerWithAddr(t, ln.Addr().String()))
  969. }
  970. m.ClientListeners = newClientListeners
  971. if m.grpcListener != nil {
  972. if err := m.listenGRPC(); err != nil {
  973. t.Fatal(err)
  974. }
  975. }
  976. err := m.Launch()
  977. lg.Info(
  978. "restarted a member",
  979. zap.String("name", m.Name),
  980. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  981. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  982. zap.String("grpc-address", m.grpcAddr),
  983. zap.Error(err),
  984. )
  985. return err
  986. }
  987. // Terminate stops the member and removes the data dir.
  988. func (m *member) Terminate(t *testing.T) {
  989. lg.Info(
  990. "terminating a member",
  991. zap.String("name", m.Name),
  992. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  993. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  994. zap.String("grpc-address", m.grpcAddr),
  995. )
  996. m.Close()
  997. if !m.keepDataDirTerminate {
  998. if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
  999. t.Fatal(err)
  1000. }
  1001. }
  1002. lg.Info(
  1003. "terminated a member",
  1004. zap.String("name", m.Name),
  1005. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  1006. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  1007. zap.String("grpc-address", m.grpcAddr),
  1008. )
  1009. }
  1010. // Metric gets the metric value for a member
  1011. func (m *member) Metric(metricName string) (string, error) {
  1012. cfgtls := transport.TLSInfo{}
  1013. tr, err := transport.NewTimeoutTransport(cfgtls, time.Second, time.Second, time.Second)
  1014. if err != nil {
  1015. return "", err
  1016. }
  1017. cli := &http.Client{Transport: tr}
  1018. resp, err := cli.Get(m.ClientURLs[0].String() + "/metrics")
  1019. if err != nil {
  1020. return "", err
  1021. }
  1022. defer resp.Body.Close()
  1023. b, rerr := ioutil.ReadAll(resp.Body)
  1024. if rerr != nil {
  1025. return "", rerr
  1026. }
  1027. lines := strings.Split(string(b), "\n")
  1028. for _, l := range lines {
  1029. if strings.HasPrefix(l, metricName) {
  1030. return strings.Split(l, " ")[1], nil
  1031. }
  1032. }
  1033. return "", nil
  1034. }
  1035. // InjectPartition drops connections from m to others, vice versa.
  1036. func (m *member) InjectPartition(t *testing.T, others ...*member) {
  1037. for _, other := range others {
  1038. m.s.CutPeer(other.s.ID())
  1039. other.s.CutPeer(m.s.ID())
  1040. }
  1041. }
  1042. // RecoverPartition recovers connections from m to others, vice versa.
  1043. func (m *member) RecoverPartition(t *testing.T, others ...*member) {
  1044. for _, other := range others {
  1045. m.s.MendPeer(other.s.ID())
  1046. other.s.MendPeer(m.s.ID())
  1047. }
  1048. }
  1049. func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client {
  1050. cfgtls := transport.TLSInfo{}
  1051. if tls != nil {
  1052. cfgtls = *tls
  1053. }
  1054. cfg := client.Config{Transport: mustNewTransport(t, cfgtls), Endpoints: eps}
  1055. c, err := client.New(cfg)
  1056. if err != nil {
  1057. t.Fatal(err)
  1058. }
  1059. return c
  1060. }
  1061. func mustNewTransport(t *testing.T, tlsInfo transport.TLSInfo) *http.Transport {
  1062. // tick in integration test is short, so 1s dial timeout could play well.
  1063. tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
  1064. if err != nil {
  1065. t.Fatal(err)
  1066. }
  1067. return tr
  1068. }
  1069. type SortableMemberSliceByPeerURLs []client.Member
  1070. func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
  1071. func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {
  1072. return p[i].PeerURLs[0] < p[j].PeerURLs[0]
  1073. }
  1074. func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  1075. type ClusterV3 struct {
  1076. *cluster
  1077. mu sync.Mutex
  1078. clients []*clientv3.Client
  1079. }
  1080. // NewClusterV3 returns a launched cluster with a grpc client connection
  1081. // for each cluster member.
  1082. func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
  1083. cfg.UseGRPC = true
  1084. if os.Getenv("CLIENT_DEBUG") != "" {
  1085. clientv3.SetLogger(grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 4))
  1086. }
  1087. clus := &ClusterV3{
  1088. cluster: NewClusterByConfig(t, cfg),
  1089. }
  1090. clus.Launch(t)
  1091. if !cfg.SkipCreatingClient {
  1092. for _, m := range clus.Members {
  1093. client, err := NewClientV3(m)
  1094. if err != nil {
  1095. t.Fatalf("cannot create client: %v", err)
  1096. }
  1097. clus.clients = append(clus.clients, client)
  1098. }
  1099. }
  1100. return clus
  1101. }
  1102. func (c *ClusterV3) TakeClient(idx int) {
  1103. c.mu.Lock()
  1104. c.clients[idx] = nil
  1105. c.mu.Unlock()
  1106. }
  1107. func (c *ClusterV3) Terminate(t *testing.T) {
  1108. c.mu.Lock()
  1109. for _, client := range c.clients {
  1110. if client == nil {
  1111. continue
  1112. }
  1113. if err := client.Close(); err != nil {
  1114. t.Error(err)
  1115. }
  1116. }
  1117. c.mu.Unlock()
  1118. c.cluster.Terminate(t)
  1119. }
  1120. func (c *ClusterV3) RandClient() *clientv3.Client {
  1121. return c.clients[rand.Intn(len(c.clients))]
  1122. }
  1123. func (c *ClusterV3) Client(i int) *clientv3.Client {
  1124. return c.clients[i]
  1125. }
  1126. type grpcAPI struct {
  1127. // Cluster is the cluster API for the client's connection.
  1128. Cluster pb.ClusterClient
  1129. // KV is the keyvalue API for the client's connection.
  1130. KV pb.KVClient
  1131. // Lease is the lease API for the client's connection.
  1132. Lease pb.LeaseClient
  1133. // Watch is the watch API for the client's connection.
  1134. Watch pb.WatchClient
  1135. // Maintenance is the maintenance API for the client's connection.
  1136. Maintenance pb.MaintenanceClient
  1137. // Auth is the authentication API for the client's connection.
  1138. Auth pb.AuthClient
  1139. // Lock is the lock API for the client's connection.
  1140. Lock lockpb.LockClient
  1141. // Election is the election API for the client's connection.
  1142. Election epb.ElectionClient
  1143. }