cluster.go 39 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "fmt"
  19. "io/ioutil"
  20. "log"
  21. "math/rand"
  22. "net"
  23. "net/http"
  24. "net/http/httptest"
  25. "os"
  26. "reflect"
  27. "sort"
  28. "strings"
  29. "sync"
  30. "sync/atomic"
  31. "testing"
  32. "time"
  33. "go.etcd.io/etcd/client"
  34. "go.etcd.io/etcd/clientv3"
  35. "go.etcd.io/etcd/embed"
  36. "go.etcd.io/etcd/etcdserver"
  37. "go.etcd.io/etcd/etcdserver/api/etcdhttp"
  38. "go.etcd.io/etcd/etcdserver/api/rafthttp"
  39. "go.etcd.io/etcd/etcdserver/api/v2http"
  40. "go.etcd.io/etcd/etcdserver/api/v3client"
  41. "go.etcd.io/etcd/etcdserver/api/v3election"
  42. epb "go.etcd.io/etcd/etcdserver/api/v3election/v3electionpb"
  43. "go.etcd.io/etcd/etcdserver/api/v3lock"
  44. lockpb "go.etcd.io/etcd/etcdserver/api/v3lock/v3lockpb"
  45. "go.etcd.io/etcd/etcdserver/api/v3rpc"
  46. pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
  47. "go.etcd.io/etcd/pkg/logutil"
  48. "go.etcd.io/etcd/pkg/testutil"
  49. "go.etcd.io/etcd/pkg/tlsutil"
  50. "go.etcd.io/etcd/pkg/transport"
  51. "go.etcd.io/etcd/pkg/types"
  52. "github.com/soheilhy/cmux"
  53. "go.uber.org/zap"
  54. "golang.org/x/crypto/bcrypt"
  55. "google.golang.org/grpc"
  56. "google.golang.org/grpc/grpclog"
  57. "google.golang.org/grpc/keepalive"
  58. )
  59. const (
  60. // RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss.
  61. RequestWaitTimeout = 3 * time.Second
  62. tickDuration = 10 * time.Millisecond
  63. requestTimeout = 20 * time.Second
  64. clusterName = "etcd"
  65. basePort = 21000
  66. URLScheme = "unix"
  67. URLSchemeTLS = "unixs"
  68. )
  69. var (
  70. electionTicks = 10
  71. // integration test uses unique ports, counting up, to listen for each
  72. // member, ensuring restarted members can listen on the same port again.
  73. localListenCount = int64(0)
  74. testTLSInfo = transport.TLSInfo{
  75. KeyFile: "./fixtures/server.key.insecure",
  76. CertFile: "./fixtures/server.crt",
  77. TrustedCAFile: "./fixtures/ca.crt",
  78. ClientCertAuth: true,
  79. }
  80. testTLSInfoIP = transport.TLSInfo{
  81. KeyFile: "./fixtures/server-ip.key.insecure",
  82. CertFile: "./fixtures/server-ip.crt",
  83. TrustedCAFile: "./fixtures/ca.crt",
  84. ClientCertAuth: true,
  85. }
  86. testTLSInfoExpired = transport.TLSInfo{
  87. KeyFile: "./fixtures-expired/server.key.insecure",
  88. CertFile: "./fixtures-expired/server.crt",
  89. TrustedCAFile: "./fixtures-expired/ca.crt",
  90. ClientCertAuth: true,
  91. }
  92. testTLSInfoExpiredIP = transport.TLSInfo{
  93. KeyFile: "./fixtures-expired/server-ip.key.insecure",
  94. CertFile: "./fixtures-expired/server-ip.crt",
  95. TrustedCAFile: "./fixtures-expired/ca.crt",
  96. ClientCertAuth: true,
  97. }
  98. defaultTokenJWT = "jwt,pub-key=./fixtures/server.crt,priv-key=./fixtures/server.key.insecure,sign-method=RS256,ttl=1s"
  99. lg = zap.NewNop()
  100. )
  101. func init() {
  102. if os.Getenv("CLUSTER_DEBUG") != "" {
  103. lg, _ = zap.NewProduction()
  104. }
  105. }
  106. type ClusterConfig struct {
  107. Size int
  108. PeerTLS *transport.TLSInfo
  109. ClientTLS *transport.TLSInfo
  110. DiscoveryURL string
  111. AuthToken string
  112. UseGRPC bool
  113. QuotaBackendBytes int64
  114. MaxTxnOps uint
  115. MaxRequestBytes uint
  116. SnapshotCount uint64
  117. SnapshotCatchUpEntries uint64
  118. GRPCKeepAliveMinTime time.Duration
  119. GRPCKeepAliveInterval time.Duration
  120. GRPCKeepAliveTimeout time.Duration
  121. // SkipCreatingClient to skip creating clients for each member.
  122. SkipCreatingClient bool
  123. ClientMaxCallSendMsgSize int
  124. ClientMaxCallRecvMsgSize int
  125. // UseIP is true to use only IP for gRPC requests.
  126. UseIP bool
  127. EnableLeaseCheckpoint bool
  128. LeaseCheckpointInterval time.Duration
  129. }
  130. type cluster struct {
  131. cfg *ClusterConfig
  132. Members []*member
  133. }
  134. func schemeFromTLSInfo(tls *transport.TLSInfo) string {
  135. if tls == nil {
  136. return URLScheme
  137. }
  138. return URLSchemeTLS
  139. }
  140. func (c *cluster) fillClusterForMembers() error {
  141. if c.cfg.DiscoveryURL != "" {
  142. // cluster will be discovered
  143. return nil
  144. }
  145. addrs := make([]string, 0)
  146. for _, m := range c.Members {
  147. scheme := schemeFromTLSInfo(m.PeerTLSInfo)
  148. for _, l := range m.PeerListeners {
  149. addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String()))
  150. }
  151. }
  152. clusterStr := strings.Join(addrs, ",")
  153. var err error
  154. for _, m := range c.Members {
  155. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  156. if err != nil {
  157. return err
  158. }
  159. }
  160. return nil
  161. }
  162. func newCluster(t testing.TB, cfg *ClusterConfig) *cluster {
  163. c := &cluster{cfg: cfg}
  164. ms := make([]*member, cfg.Size)
  165. for i := 0; i < cfg.Size; i++ {
  166. ms[i] = c.mustNewMember(t)
  167. }
  168. c.Members = ms
  169. if err := c.fillClusterForMembers(); err != nil {
  170. t.Fatal(err)
  171. }
  172. return c
  173. }
  174. // NewCluster returns an unlaunched cluster of the given size which has been
  175. // set to use static bootstrap.
  176. func NewCluster(t testing.TB, size int) *cluster {
  177. return newCluster(t, &ClusterConfig{Size: size})
  178. }
  179. // NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration
  180. func NewClusterByConfig(t testing.TB, cfg *ClusterConfig) *cluster {
  181. return newCluster(t, cfg)
  182. }
  183. func (c *cluster) Launch(t testing.TB) {
  184. errc := make(chan error)
  185. for _, m := range c.Members {
  186. // Members are launched in separate goroutines because if they boot
  187. // using discovery url, they have to wait for others to register to continue.
  188. go func(m *member) {
  189. errc <- m.Launch()
  190. }(m)
  191. }
  192. for range c.Members {
  193. if err := <-errc; err != nil {
  194. t.Fatalf("error setting up member: %v", err)
  195. }
  196. }
  197. // wait cluster to be stable to receive future client requests
  198. c.waitMembersMatch(t, c.HTTPMembers())
  199. c.waitVersion()
  200. }
  201. func (c *cluster) URL(i int) string {
  202. return c.Members[i].ClientURLs[0].String()
  203. }
  204. // URLs returns a list of all active client URLs in the cluster
  205. func (c *cluster) URLs() []string {
  206. return getMembersURLs(c.Members)
  207. }
  208. func getMembersURLs(members []*member) []string {
  209. urls := make([]string, 0)
  210. for _, m := range members {
  211. select {
  212. case <-m.s.StopNotify():
  213. continue
  214. default:
  215. }
  216. for _, u := range m.ClientURLs {
  217. urls = append(urls, u.String())
  218. }
  219. }
  220. return urls
  221. }
  222. // HTTPMembers returns a list of all active members as client.Members
  223. func (c *cluster) HTTPMembers() []client.Member {
  224. ms := []client.Member{}
  225. for _, m := range c.Members {
  226. pScheme := schemeFromTLSInfo(m.PeerTLSInfo)
  227. cScheme := schemeFromTLSInfo(m.ClientTLSInfo)
  228. cm := client.Member{Name: m.Name}
  229. for _, ln := range m.PeerListeners {
  230. cm.PeerURLs = append(cm.PeerURLs, pScheme+"://"+ln.Addr().String())
  231. }
  232. for _, ln := range m.ClientListeners {
  233. cm.ClientURLs = append(cm.ClientURLs, cScheme+"://"+ln.Addr().String())
  234. }
  235. ms = append(ms, cm)
  236. }
  237. return ms
  238. }
  239. func (c *cluster) mustNewMember(t testing.TB) *member {
  240. m := mustNewMember(t,
  241. memberConfig{
  242. name: c.name(rand.Int()),
  243. authToken: c.cfg.AuthToken,
  244. peerTLS: c.cfg.PeerTLS,
  245. clientTLS: c.cfg.ClientTLS,
  246. quotaBackendBytes: c.cfg.QuotaBackendBytes,
  247. maxTxnOps: c.cfg.MaxTxnOps,
  248. maxRequestBytes: c.cfg.MaxRequestBytes,
  249. snapshotCount: c.cfg.SnapshotCount,
  250. snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries,
  251. grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime,
  252. grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
  253. grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout,
  254. clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
  255. clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
  256. useIP: c.cfg.UseIP,
  257. enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
  258. leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
  259. })
  260. m.DiscoveryURL = c.cfg.DiscoveryURL
  261. if c.cfg.UseGRPC {
  262. if err := m.listenGRPC(); err != nil {
  263. t.Fatal(err)
  264. }
  265. }
  266. return m
  267. }
  268. func (c *cluster) addMember(t testing.TB) {
  269. m := c.mustNewMember(t)
  270. scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
  271. // send add request to the cluster
  272. var err error
  273. for i := 0; i < len(c.Members); i++ {
  274. clientURL := c.URL(i)
  275. peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
  276. if err = c.addMemberByURL(t, clientURL, peerURL); err == nil {
  277. break
  278. }
  279. }
  280. if err != nil {
  281. t.Fatalf("add member failed on all members error: %v", err)
  282. }
  283. m.InitialPeerURLsMap = types.URLsMap{}
  284. for _, mm := range c.Members {
  285. m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
  286. }
  287. m.InitialPeerURLsMap[m.Name] = m.PeerURLs
  288. m.NewCluster = false
  289. if err := m.Launch(); err != nil {
  290. t.Fatal(err)
  291. }
  292. c.Members = append(c.Members, m)
  293. // wait cluster to be stable to receive future client requests
  294. c.waitMembersMatch(t, c.HTTPMembers())
  295. }
  296. func (c *cluster) addMemberByURL(t testing.TB, clientURL, peerURL string) error {
  297. cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
  298. ma := client.NewMembersAPI(cc)
  299. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  300. _, err := ma.Add(ctx, peerURL)
  301. cancel()
  302. if err != nil {
  303. return err
  304. }
  305. // wait for the add node entry applied in the cluster
  306. members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
  307. c.waitMembersMatch(t, members)
  308. return nil
  309. }
  310. func (c *cluster) AddMember(t testing.TB) {
  311. c.addMember(t)
  312. }
  313. func (c *cluster) RemoveMember(t testing.TB, id uint64) {
  314. if err := c.removeMember(t, id); err != nil {
  315. t.Fatal(err)
  316. }
  317. }
  318. func (c *cluster) removeMember(t testing.TB, id uint64) error {
  319. // send remove request to the cluster
  320. cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
  321. ma := client.NewMembersAPI(cc)
  322. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  323. err := ma.Remove(ctx, types.ID(id).String())
  324. cancel()
  325. if err != nil {
  326. return err
  327. }
  328. newMembers := make([]*member, 0)
  329. for _, m := range c.Members {
  330. if uint64(m.s.ID()) != id {
  331. newMembers = append(newMembers, m)
  332. } else {
  333. select {
  334. case <-m.s.StopNotify():
  335. m.Terminate(t)
  336. // 1s stop delay + election timeout + 1s disk and network delay + connection write timeout
  337. // TODO: remove connection write timeout by selecting on http response closeNotifier
  338. // blocking on https://github.com/golang/go/issues/9524
  339. case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout):
  340. t.Fatalf("failed to remove member %s in time", m.s.ID())
  341. }
  342. }
  343. }
  344. c.Members = newMembers
  345. c.waitMembersMatch(t, c.HTTPMembers())
  346. return nil
  347. }
  348. func (c *cluster) Terminate(t testing.TB) {
  349. var wg sync.WaitGroup
  350. wg.Add(len(c.Members))
  351. for _, m := range c.Members {
  352. go func(mm *member) {
  353. defer wg.Done()
  354. mm.Terminate(t)
  355. }(m)
  356. }
  357. wg.Wait()
  358. }
  359. func (c *cluster) waitMembersMatch(t testing.TB, membs []client.Member) {
  360. for _, u := range c.URLs() {
  361. cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
  362. ma := client.NewMembersAPI(cc)
  363. for {
  364. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  365. ms, err := ma.List(ctx)
  366. cancel()
  367. if err == nil && isMembersEqual(ms, membs) {
  368. break
  369. }
  370. time.Sleep(tickDuration)
  371. }
  372. }
  373. }
  374. func (c *cluster) WaitLeader(t testing.TB) int { return c.waitLeader(t, c.Members) }
  375. // waitLeader waits until given members agree on the same leader.
  376. func (c *cluster) waitLeader(t testing.TB, membs []*member) int {
  377. possibleLead := make(map[uint64]bool)
  378. var lead uint64
  379. for _, m := range membs {
  380. possibleLead[uint64(m.s.ID())] = true
  381. }
  382. cc := MustNewHTTPClient(t, getMembersURLs(membs), nil)
  383. kapi := client.NewKeysAPI(cc)
  384. // ensure leader is up via linearizable get
  385. for {
  386. ctx, cancel := context.WithTimeout(context.Background(), 10*tickDuration+time.Second)
  387. _, err := kapi.Get(ctx, "0", &client.GetOptions{Quorum: true})
  388. cancel()
  389. if err == nil || strings.Contains(err.Error(), "Key not found") {
  390. break
  391. }
  392. }
  393. for lead == 0 || !possibleLead[lead] {
  394. lead = 0
  395. for _, m := range membs {
  396. select {
  397. case <-m.s.StopNotify():
  398. continue
  399. default:
  400. }
  401. if lead != 0 && lead != m.s.Lead() {
  402. lead = 0
  403. time.Sleep(10 * tickDuration)
  404. break
  405. }
  406. lead = m.s.Lead()
  407. }
  408. }
  409. for i, m := range membs {
  410. if uint64(m.s.ID()) == lead {
  411. return i
  412. }
  413. }
  414. return -1
  415. }
  416. func (c *cluster) WaitNoLeader() { c.waitNoLeader(c.Members) }
  417. // waitNoLeader waits until given members lose leader.
  418. func (c *cluster) waitNoLeader(membs []*member) {
  419. noLeader := false
  420. for !noLeader {
  421. noLeader = true
  422. for _, m := range membs {
  423. select {
  424. case <-m.s.StopNotify():
  425. continue
  426. default:
  427. }
  428. if m.s.Lead() != 0 {
  429. noLeader = false
  430. time.Sleep(10 * tickDuration)
  431. break
  432. }
  433. }
  434. }
  435. }
  436. func (c *cluster) waitVersion() {
  437. for _, m := range c.Members {
  438. for {
  439. if m.s.ClusterVersion() != nil {
  440. break
  441. }
  442. time.Sleep(tickDuration)
  443. }
  444. }
  445. }
  446. func (c *cluster) name(i int) string {
  447. return fmt.Sprint(i)
  448. }
  449. // isMembersEqual checks whether two members equal except ID field.
  450. // The given wmembs should always set ID field to empty string.
  451. func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
  452. sort.Sort(SortableMemberSliceByPeerURLs(membs))
  453. sort.Sort(SortableMemberSliceByPeerURLs(wmembs))
  454. for i := range membs {
  455. membs[i].ID = ""
  456. }
  457. return reflect.DeepEqual(membs, wmembs)
  458. }
  459. func newLocalListener(t testing.TB) net.Listener {
  460. c := atomic.AddInt64(&localListenCount, 1)
  461. // Go 1.8+ allows only numbers in port
  462. addr := fmt.Sprintf("127.0.0.1:%05d%05d", c+basePort, os.Getpid())
  463. return NewListenerWithAddr(t, addr)
  464. }
  465. func NewListenerWithAddr(t testing.TB, addr string) net.Listener {
  466. l, err := transport.NewUnixListener(addr)
  467. if err != nil {
  468. t.Fatal(err)
  469. }
  470. return l
  471. }
  472. type member struct {
  473. etcdserver.ServerConfig
  474. PeerListeners, ClientListeners []net.Listener
  475. grpcListener net.Listener
  476. // PeerTLSInfo enables peer TLS when set
  477. PeerTLSInfo *transport.TLSInfo
  478. // ClientTLSInfo enables client TLS when set
  479. ClientTLSInfo *transport.TLSInfo
  480. DialOptions []grpc.DialOption
  481. raftHandler *testutil.PauseableHandler
  482. s *etcdserver.EtcdServer
  483. serverClosers []func()
  484. grpcServerOpts []grpc.ServerOption
  485. grpcServer *grpc.Server
  486. grpcServerPeer *grpc.Server
  487. grpcAddr string
  488. grpcBridge *bridge
  489. // serverClient is a clientv3 that directly calls the etcdserver.
  490. serverClient *clientv3.Client
  491. keepDataDirTerminate bool
  492. clientMaxCallSendMsgSize int
  493. clientMaxCallRecvMsgSize int
  494. useIP bool
  495. isLearner bool
  496. }
  497. func (m *member) GRPCAddr() string { return m.grpcAddr }
  498. type memberConfig struct {
  499. name string
  500. peerTLS *transport.TLSInfo
  501. clientTLS *transport.TLSInfo
  502. authToken string
  503. quotaBackendBytes int64
  504. maxTxnOps uint
  505. maxRequestBytes uint
  506. snapshotCount uint64
  507. snapshotCatchUpEntries uint64
  508. grpcKeepAliveMinTime time.Duration
  509. grpcKeepAliveInterval time.Duration
  510. grpcKeepAliveTimeout time.Duration
  511. clientMaxCallSendMsgSize int
  512. clientMaxCallRecvMsgSize int
  513. useIP bool
  514. enableLeaseCheckpoint bool
  515. leaseCheckpointInterval time.Duration
  516. }
  517. // mustNewMember return an inited member with the given name. If peerTLS is
  518. // set, it will use https scheme to communicate between peers.
  519. func mustNewMember(t testing.TB, mcfg memberConfig) *member {
  520. var err error
  521. m := &member{}
  522. peerScheme := schemeFromTLSInfo(mcfg.peerTLS)
  523. clientScheme := schemeFromTLSInfo(mcfg.clientTLS)
  524. pln := newLocalListener(t)
  525. m.PeerListeners = []net.Listener{pln}
  526. m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()})
  527. if err != nil {
  528. t.Fatal(err)
  529. }
  530. m.PeerTLSInfo = mcfg.peerTLS
  531. cln := newLocalListener(t)
  532. m.ClientListeners = []net.Listener{cln}
  533. m.ClientURLs, err = types.NewURLs([]string{clientScheme + "://" + cln.Addr().String()})
  534. if err != nil {
  535. t.Fatal(err)
  536. }
  537. m.ClientTLSInfo = mcfg.clientTLS
  538. m.Name = mcfg.name
  539. m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
  540. if err != nil {
  541. t.Fatal(err)
  542. }
  543. clusterStr := fmt.Sprintf("%s=%s://%s", mcfg.name, peerScheme, pln.Addr().String())
  544. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  545. if err != nil {
  546. t.Fatal(err)
  547. }
  548. m.InitialClusterToken = clusterName
  549. m.NewCluster = true
  550. m.BootstrapTimeout = 10 * time.Millisecond
  551. if m.PeerTLSInfo != nil {
  552. m.ServerConfig.PeerTLSInfo = *m.PeerTLSInfo
  553. }
  554. m.ElectionTicks = electionTicks
  555. m.InitialElectionTickAdvance = true
  556. m.TickMs = uint(tickDuration / time.Millisecond)
  557. m.QuotaBackendBytes = mcfg.quotaBackendBytes
  558. m.MaxTxnOps = mcfg.maxTxnOps
  559. if m.MaxTxnOps == 0 {
  560. m.MaxTxnOps = embed.DefaultMaxTxnOps
  561. }
  562. m.MaxRequestBytes = mcfg.maxRequestBytes
  563. if m.MaxRequestBytes == 0 {
  564. m.MaxRequestBytes = embed.DefaultMaxRequestBytes
  565. }
  566. m.SnapshotCount = etcdserver.DefaultSnapshotCount
  567. if mcfg.snapshotCount != 0 {
  568. m.SnapshotCount = mcfg.snapshotCount
  569. }
  570. m.SnapshotCatchUpEntries = etcdserver.DefaultSnapshotCatchUpEntries
  571. if mcfg.snapshotCatchUpEntries != 0 {
  572. m.SnapshotCatchUpEntries = mcfg.snapshotCatchUpEntries
  573. }
  574. // for the purpose of integration testing, simple token is enough
  575. m.AuthToken = "simple"
  576. if mcfg.authToken != "" {
  577. m.AuthToken = mcfg.authToken
  578. }
  579. m.BcryptCost = uint(bcrypt.MinCost) // use min bcrypt cost to speedy up integration testing
  580. m.grpcServerOpts = []grpc.ServerOption{}
  581. if mcfg.grpcKeepAliveMinTime > time.Duration(0) {
  582. m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
  583. MinTime: mcfg.grpcKeepAliveMinTime,
  584. PermitWithoutStream: false,
  585. }))
  586. }
  587. if mcfg.grpcKeepAliveInterval > time.Duration(0) &&
  588. mcfg.grpcKeepAliveTimeout > time.Duration(0) {
  589. m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveParams(keepalive.ServerParameters{
  590. Time: mcfg.grpcKeepAliveInterval,
  591. Timeout: mcfg.grpcKeepAliveTimeout,
  592. }))
  593. }
  594. m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize
  595. m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize
  596. m.useIP = mcfg.useIP
  597. m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint
  598. m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
  599. m.InitialCorruptCheck = true
  600. lcfg := logutil.DefaultZapLoggerConfig
  601. m.LoggerConfig = &lcfg
  602. m.LoggerConfig.OutputPaths = []string{"/dev/null"}
  603. m.LoggerConfig.ErrorOutputPaths = []string{"/dev/null"}
  604. if os.Getenv("CLUSTER_DEBUG") != "" {
  605. m.LoggerConfig.OutputPaths = []string{"stderr"}
  606. m.LoggerConfig.ErrorOutputPaths = []string{"stderr"}
  607. }
  608. m.Logger, err = m.LoggerConfig.Build()
  609. if err != nil {
  610. t.Fatal(err)
  611. }
  612. return m
  613. }
  614. // listenGRPC starts a grpc server over a unix domain socket on the member
  615. func (m *member) listenGRPC() error {
  616. // prefix with localhost so cert has right domain
  617. m.grpcAddr = "localhost:" + m.Name
  618. if m.useIP { // for IP-only TLS certs
  619. m.grpcAddr = "127.0.0.1:" + m.Name
  620. }
  621. l, err := transport.NewUnixListener(m.grpcAddr)
  622. if err != nil {
  623. return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
  624. }
  625. m.grpcBridge, err = newBridge(m.grpcAddr)
  626. if err != nil {
  627. l.Close()
  628. return err
  629. }
  630. m.grpcAddr = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + m.grpcBridge.inaddr
  631. m.grpcListener = l
  632. return nil
  633. }
  634. func (m *member) ElectionTimeout() time.Duration {
  635. return time.Duration(m.s.Cfg.ElectionTicks*int(m.s.Cfg.TickMs)) * time.Millisecond
  636. }
  637. func (m *member) ID() types.ID { return m.s.ID() }
  638. func (m *member) DropConnections() { m.grpcBridge.Reset() }
  639. func (m *member) PauseConnections() { m.grpcBridge.Pause() }
  640. func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
  641. func (m *member) Blackhole() { m.grpcBridge.Blackhole() }
  642. func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() }
  643. // NewClientV3 creates a new grpc client connection to the member
  644. func NewClientV3(m *member) (*clientv3.Client, error) {
  645. if m.grpcAddr == "" {
  646. return nil, fmt.Errorf("member not configured for grpc")
  647. }
  648. cfg := clientv3.Config{
  649. Endpoints: []string{m.grpcAddr},
  650. DialTimeout: 5 * time.Second,
  651. DialOptions: []grpc.DialOption{grpc.WithBlock()},
  652. MaxCallSendMsgSize: m.clientMaxCallSendMsgSize,
  653. MaxCallRecvMsgSize: m.clientMaxCallRecvMsgSize,
  654. }
  655. if m.ClientTLSInfo != nil {
  656. tls, err := m.ClientTLSInfo.ClientConfig()
  657. if err != nil {
  658. return nil, err
  659. }
  660. cfg.TLS = tls
  661. }
  662. if m.DialOptions != nil {
  663. cfg.DialOptions = append(cfg.DialOptions, m.DialOptions...)
  664. }
  665. return newClientV3(cfg)
  666. }
  667. // Clone returns a member with the same server configuration. The returned
  668. // member will not set PeerListeners and ClientListeners.
  669. func (m *member) Clone(t testing.TB) *member {
  670. mm := &member{}
  671. mm.ServerConfig = m.ServerConfig
  672. var err error
  673. clientURLStrs := m.ClientURLs.StringSlice()
  674. mm.ClientURLs, err = types.NewURLs(clientURLStrs)
  675. if err != nil {
  676. // this should never fail
  677. panic(err)
  678. }
  679. peerURLStrs := m.PeerURLs.StringSlice()
  680. mm.PeerURLs, err = types.NewURLs(peerURLStrs)
  681. if err != nil {
  682. // this should never fail
  683. panic(err)
  684. }
  685. clusterStr := m.InitialPeerURLsMap.String()
  686. mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  687. if err != nil {
  688. // this should never fail
  689. panic(err)
  690. }
  691. mm.InitialClusterToken = m.InitialClusterToken
  692. mm.ElectionTicks = m.ElectionTicks
  693. mm.PeerTLSInfo = m.PeerTLSInfo
  694. mm.ClientTLSInfo = m.ClientTLSInfo
  695. return mm
  696. }
  697. // Launch starts a member based on ServerConfig, PeerListeners
  698. // and ClientListeners.
  699. func (m *member) Launch() error {
  700. lg.Info(
  701. "launching a member",
  702. zap.String("name", m.Name),
  703. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  704. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  705. zap.String("grpc-address", m.grpcAddr),
  706. )
  707. var err error
  708. if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
  709. return fmt.Errorf("failed to initialize the etcd server: %v", err)
  710. }
  711. m.s.SyncTicker = time.NewTicker(500 * time.Millisecond)
  712. m.s.Start()
  713. var peerTLScfg *tls.Config
  714. if m.PeerTLSInfo != nil && !m.PeerTLSInfo.Empty() {
  715. if peerTLScfg, err = m.PeerTLSInfo.ServerConfig(); err != nil {
  716. return err
  717. }
  718. }
  719. if m.grpcListener != nil {
  720. var (
  721. tlscfg *tls.Config
  722. )
  723. if m.ClientTLSInfo != nil && !m.ClientTLSInfo.Empty() {
  724. tlscfg, err = m.ClientTLSInfo.ServerConfig()
  725. if err != nil {
  726. return err
  727. }
  728. }
  729. m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...)
  730. m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg)
  731. m.serverClient = v3client.New(m.s)
  732. lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient))
  733. epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient))
  734. go m.grpcServer.Serve(m.grpcListener)
  735. }
  736. m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.Logger, m.s)}
  737. h := (http.Handler)(m.raftHandler)
  738. if m.grpcListener != nil {
  739. h = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  740. if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
  741. m.grpcServerPeer.ServeHTTP(w, r)
  742. } else {
  743. m.raftHandler.ServeHTTP(w, r)
  744. }
  745. })
  746. }
  747. for _, ln := range m.PeerListeners {
  748. cm := cmux.New(ln)
  749. // don't hang on matcher after closing listener
  750. cm.SetReadTimeout(time.Second)
  751. if m.grpcServer != nil {
  752. grpcl := cm.Match(cmux.HTTP2())
  753. go m.grpcServerPeer.Serve(grpcl)
  754. }
  755. // serve http1/http2 rafthttp/grpc
  756. ll := cm.Match(cmux.Any())
  757. if peerTLScfg != nil {
  758. if ll, err = transport.NewTLSListener(ll, m.PeerTLSInfo); err != nil {
  759. return err
  760. }
  761. }
  762. hs := &httptest.Server{
  763. Listener: ll,
  764. Config: &http.Server{
  765. Handler: h,
  766. TLSConfig: peerTLScfg,
  767. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  768. },
  769. TLS: peerTLScfg,
  770. }
  771. hs.Start()
  772. donec := make(chan struct{})
  773. go func() {
  774. defer close(donec)
  775. cm.Serve()
  776. }()
  777. closer := func() {
  778. ll.Close()
  779. hs.CloseClientConnections()
  780. hs.Close()
  781. <-donec
  782. }
  783. m.serverClosers = append(m.serverClosers, closer)
  784. }
  785. for _, ln := range m.ClientListeners {
  786. hs := &httptest.Server{
  787. Listener: ln,
  788. Config: &http.Server{
  789. Handler: v2http.NewClientHandler(
  790. m.Logger,
  791. m.s,
  792. m.ServerConfig.ReqTimeout(),
  793. ),
  794. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  795. },
  796. }
  797. if m.ClientTLSInfo == nil {
  798. hs.Start()
  799. } else {
  800. info := m.ClientTLSInfo
  801. hs.TLS, err = info.ServerConfig()
  802. if err != nil {
  803. return err
  804. }
  805. // baseConfig is called on initial TLS handshake start.
  806. //
  807. // Previously,
  808. // 1. Server has non-empty (*tls.Config).Certificates on client hello
  809. // 2. Server calls (*tls.Config).GetCertificate iff:
  810. // - Server's (*tls.Config).Certificates is not empty, or
  811. // - Client supplies SNI; non-empty (*tls.ClientHelloInfo).ServerName
  812. //
  813. // When (*tls.Config).Certificates is always populated on initial handshake,
  814. // client is expected to provide a valid matching SNI to pass the TLS
  815. // verification, thus trigger server (*tls.Config).GetCertificate to reload
  816. // TLS assets. However, a cert whose SAN field does not include domain names
  817. // but only IP addresses, has empty (*tls.ClientHelloInfo).ServerName, thus
  818. // it was never able to trigger TLS reload on initial handshake; first
  819. // ceritifcate object was being used, never being updated.
  820. //
  821. // Now, (*tls.Config).Certificates is created empty on initial TLS client
  822. // handshake, in order to trigger (*tls.Config).GetCertificate and populate
  823. // rest of the certificates on every new TLS connection, even when client
  824. // SNI is empty (e.g. cert only includes IPs).
  825. //
  826. // This introduces another problem with "httptest.Server":
  827. // when server initial certificates are empty, certificates
  828. // are overwritten by Go's internal test certs, which have
  829. // different SAN fields (e.g. example.com). To work around,
  830. // re-overwrite (*tls.Config).Certificates before starting
  831. // test server.
  832. tlsCert, err := tlsutil.NewCert(info.CertFile, info.KeyFile, nil)
  833. if err != nil {
  834. return err
  835. }
  836. hs.TLS.Certificates = []tls.Certificate{*tlsCert}
  837. hs.StartTLS()
  838. }
  839. closer := func() {
  840. ln.Close()
  841. hs.CloseClientConnections()
  842. hs.Close()
  843. }
  844. m.serverClosers = append(m.serverClosers, closer)
  845. }
  846. lg.Info(
  847. "launched a member",
  848. zap.String("name", m.Name),
  849. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  850. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  851. zap.String("grpc-address", m.grpcAddr),
  852. )
  853. return nil
  854. }
  855. func (m *member) WaitOK(t testing.TB) {
  856. m.WaitStarted(t)
  857. for m.s.Leader() == 0 {
  858. time.Sleep(tickDuration)
  859. }
  860. }
  861. func (m *member) WaitStarted(t testing.TB) {
  862. cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
  863. kapi := client.NewKeysAPI(cc)
  864. for {
  865. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  866. _, err := kapi.Get(ctx, "/", nil)
  867. if err != nil {
  868. time.Sleep(tickDuration)
  869. continue
  870. }
  871. cancel()
  872. break
  873. }
  874. }
  875. func WaitClientV3(t testing.TB, kv clientv3.KV) {
  876. timeout := time.Now().Add(requestTimeout)
  877. var err error
  878. for time.Now().Before(timeout) {
  879. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  880. _, err = kv.Get(ctx, "/")
  881. cancel()
  882. if err == nil {
  883. return
  884. }
  885. time.Sleep(tickDuration)
  886. }
  887. if err != nil {
  888. t.Fatalf("timed out waiting for client: %v", err)
  889. }
  890. }
  891. func (m *member) URL() string { return m.ClientURLs[0].String() }
  892. func (m *member) Pause() {
  893. m.raftHandler.Pause()
  894. m.s.PauseSending()
  895. }
  896. func (m *member) Resume() {
  897. m.raftHandler.Resume()
  898. m.s.ResumeSending()
  899. }
  900. // Close stops the member's etcdserver and closes its connections
  901. func (m *member) Close() {
  902. if m.grpcBridge != nil {
  903. m.grpcBridge.Close()
  904. m.grpcBridge = nil
  905. }
  906. if m.serverClient != nil {
  907. m.serverClient.Close()
  908. m.serverClient = nil
  909. }
  910. if m.grpcServer != nil {
  911. m.grpcServer.Stop()
  912. m.grpcServer.GracefulStop()
  913. m.grpcServer = nil
  914. m.grpcServerPeer.Stop()
  915. m.grpcServerPeer.GracefulStop()
  916. m.grpcServerPeer = nil
  917. }
  918. m.s.HardStop()
  919. for _, f := range m.serverClosers {
  920. f()
  921. }
  922. }
  923. // Stop stops the member, but the data dir of the member is preserved.
  924. func (m *member) Stop(t testing.TB) {
  925. lg.Info(
  926. "stopping a member",
  927. zap.String("name", m.Name),
  928. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  929. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  930. zap.String("grpc-address", m.grpcAddr),
  931. )
  932. m.Close()
  933. m.serverClosers = nil
  934. lg.Info(
  935. "stopped a member",
  936. zap.String("name", m.Name),
  937. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  938. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  939. zap.String("grpc-address", m.grpcAddr),
  940. )
  941. }
  942. // checkLeaderTransition waits for leader transition, returning the new leader ID.
  943. func checkLeaderTransition(m *member, oldLead uint64) uint64 {
  944. interval := time.Duration(m.s.Cfg.TickMs) * time.Millisecond
  945. for m.s.Lead() == 0 || (m.s.Lead() == oldLead) {
  946. time.Sleep(interval)
  947. }
  948. return m.s.Lead()
  949. }
  950. // StopNotify unblocks when a member stop completes
  951. func (m *member) StopNotify() <-chan struct{} {
  952. return m.s.StopNotify()
  953. }
  954. // Restart starts the member using the preserved data dir.
  955. func (m *member) Restart(t testing.TB) error {
  956. lg.Info(
  957. "restarting a member",
  958. zap.String("name", m.Name),
  959. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  960. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  961. zap.String("grpc-address", m.grpcAddr),
  962. )
  963. newPeerListeners := make([]net.Listener, 0)
  964. for _, ln := range m.PeerListeners {
  965. newPeerListeners = append(newPeerListeners, NewListenerWithAddr(t, ln.Addr().String()))
  966. }
  967. m.PeerListeners = newPeerListeners
  968. newClientListeners := make([]net.Listener, 0)
  969. for _, ln := range m.ClientListeners {
  970. newClientListeners = append(newClientListeners, NewListenerWithAddr(t, ln.Addr().String()))
  971. }
  972. m.ClientListeners = newClientListeners
  973. if m.grpcListener != nil {
  974. if err := m.listenGRPC(); err != nil {
  975. t.Fatal(err)
  976. }
  977. }
  978. err := m.Launch()
  979. lg.Info(
  980. "restarted a member",
  981. zap.String("name", m.Name),
  982. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  983. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  984. zap.String("grpc-address", m.grpcAddr),
  985. zap.Error(err),
  986. )
  987. return err
  988. }
  989. // Terminate stops the member and removes the data dir.
  990. func (m *member) Terminate(t testing.TB) {
  991. lg.Info(
  992. "terminating a member",
  993. zap.String("name", m.Name),
  994. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  995. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  996. zap.String("grpc-address", m.grpcAddr),
  997. )
  998. m.Close()
  999. if !m.keepDataDirTerminate {
  1000. if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
  1001. t.Fatal(err)
  1002. }
  1003. }
  1004. lg.Info(
  1005. "terminated a member",
  1006. zap.String("name", m.Name),
  1007. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  1008. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  1009. zap.String("grpc-address", m.grpcAddr),
  1010. )
  1011. }
  1012. // Metric gets the metric value for a member
  1013. func (m *member) Metric(metricName string) (string, error) {
  1014. cfgtls := transport.TLSInfo{}
  1015. tr, err := transport.NewTimeoutTransport(cfgtls, time.Second, time.Second, time.Second)
  1016. if err != nil {
  1017. return "", err
  1018. }
  1019. cli := &http.Client{Transport: tr}
  1020. resp, err := cli.Get(m.ClientURLs[0].String() + "/metrics")
  1021. if err != nil {
  1022. return "", err
  1023. }
  1024. defer resp.Body.Close()
  1025. b, rerr := ioutil.ReadAll(resp.Body)
  1026. if rerr != nil {
  1027. return "", rerr
  1028. }
  1029. lines := strings.Split(string(b), "\n")
  1030. for _, l := range lines {
  1031. if strings.HasPrefix(l, metricName) {
  1032. return strings.Split(l, " ")[1], nil
  1033. }
  1034. }
  1035. return "", nil
  1036. }
  1037. // InjectPartition drops connections from m to others, vice versa.
  1038. func (m *member) InjectPartition(t testing.TB, others ...*member) {
  1039. for _, other := range others {
  1040. m.s.CutPeer(other.s.ID())
  1041. other.s.CutPeer(m.s.ID())
  1042. }
  1043. }
  1044. // RecoverPartition recovers connections from m to others, vice versa.
  1045. func (m *member) RecoverPartition(t testing.TB, others ...*member) {
  1046. for _, other := range others {
  1047. m.s.MendPeer(other.s.ID())
  1048. other.s.MendPeer(m.s.ID())
  1049. }
  1050. }
  1051. func (m *member) ReadyNotify() <-chan struct{} {
  1052. return m.s.ReadyNotify()
  1053. }
  1054. func MustNewHTTPClient(t testing.TB, eps []string, tls *transport.TLSInfo) client.Client {
  1055. cfgtls := transport.TLSInfo{}
  1056. if tls != nil {
  1057. cfgtls = *tls
  1058. }
  1059. cfg := client.Config{Transport: mustNewTransport(t, cfgtls), Endpoints: eps}
  1060. c, err := client.New(cfg)
  1061. if err != nil {
  1062. t.Fatal(err)
  1063. }
  1064. return c
  1065. }
  1066. func mustNewTransport(t testing.TB, tlsInfo transport.TLSInfo) *http.Transport {
  1067. // tick in integration test is short, so 1s dial timeout could play well.
  1068. tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
  1069. if err != nil {
  1070. t.Fatal(err)
  1071. }
  1072. return tr
  1073. }
  1074. type SortableMemberSliceByPeerURLs []client.Member
  1075. func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
  1076. func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {
  1077. return p[i].PeerURLs[0] < p[j].PeerURLs[0]
  1078. }
  1079. func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  1080. type ClusterV3 struct {
  1081. *cluster
  1082. mu sync.Mutex
  1083. clients []*clientv3.Client
  1084. }
  1085. // NewClusterV3 returns a launched cluster with a grpc client connection
  1086. // for each cluster member.
  1087. func NewClusterV3(t testing.TB, cfg *ClusterConfig) *ClusterV3 {
  1088. cfg.UseGRPC = true
  1089. if os.Getenv("CLIENT_DEBUG") != "" {
  1090. clientv3.SetLogger(grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 4))
  1091. }
  1092. clus := &ClusterV3{
  1093. cluster: NewClusterByConfig(t, cfg),
  1094. }
  1095. clus.Launch(t)
  1096. if !cfg.SkipCreatingClient {
  1097. for _, m := range clus.Members {
  1098. client, err := NewClientV3(m)
  1099. if err != nil {
  1100. t.Fatalf("cannot create client: %v", err)
  1101. }
  1102. clus.clients = append(clus.clients, client)
  1103. }
  1104. }
  1105. return clus
  1106. }
  1107. func (c *ClusterV3) TakeClient(idx int) {
  1108. c.mu.Lock()
  1109. c.clients[idx] = nil
  1110. c.mu.Unlock()
  1111. }
  1112. func (c *ClusterV3) Terminate(t testing.TB) {
  1113. c.mu.Lock()
  1114. for _, client := range c.clients {
  1115. if client == nil {
  1116. continue
  1117. }
  1118. if err := client.Close(); err != nil {
  1119. t.Error(err)
  1120. }
  1121. }
  1122. c.mu.Unlock()
  1123. c.cluster.Terminate(t)
  1124. }
  1125. func (c *ClusterV3) RandClient() *clientv3.Client {
  1126. return c.clients[rand.Intn(len(c.clients))]
  1127. }
  1128. func (c *ClusterV3) Client(i int) *clientv3.Client {
  1129. return c.clients[i]
  1130. }
  1131. type grpcAPI struct {
  1132. // Cluster is the cluster API for the client's connection.
  1133. Cluster pb.ClusterClient
  1134. // KV is the keyvalue API for the client's connection.
  1135. KV pb.KVClient
  1136. // Lease is the lease API for the client's connection.
  1137. Lease pb.LeaseClient
  1138. // Watch is the watch API for the client's connection.
  1139. Watch pb.WatchClient
  1140. // Maintenance is the maintenance API for the client's connection.
  1141. Maintenance pb.MaintenanceClient
  1142. // Auth is the authentication API for the client's connection.
  1143. Auth pb.AuthClient
  1144. // Lock is the lock API for the client's connection.
  1145. Lock lockpb.LockClient
  1146. // Election is the election API for the client's connection.
  1147. Election epb.ElectionClient
  1148. }
  1149. // GetLearnerMembers returns the list of learner members in cluster using MemberList API.
  1150. func (c *ClusterV3) GetLearnerMembers() ([]*pb.Member, error) {
  1151. cli := c.Client(0)
  1152. resp, err := cli.MemberList(context.Background())
  1153. if err != nil {
  1154. return nil, fmt.Errorf("failed to list member %v", err)
  1155. }
  1156. var learners []*pb.Member
  1157. for _, m := range resp.Members {
  1158. if m.IsLearner {
  1159. learners = append(learners, m)
  1160. }
  1161. }
  1162. return learners, nil
  1163. }
  1164. // AddAndLaunchLearnerMember creates a leaner member, adds it to cluster
  1165. // via v3 MemberAdd API, and then launches the new member.
  1166. func (c *ClusterV3) AddAndLaunchLearnerMember(t testing.TB) {
  1167. m := c.mustNewMember(t)
  1168. m.isLearner = true
  1169. scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
  1170. peerURLs := []string{scheme + "://" + m.PeerListeners[0].Addr().String()}
  1171. cli := c.Client(0)
  1172. _, err := cli.MemberAddAsLearner(context.Background(), peerURLs)
  1173. if err != nil {
  1174. t.Fatalf("failed to add learner member %v", err)
  1175. }
  1176. m.InitialPeerURLsMap = types.URLsMap{}
  1177. for _, mm := range c.Members {
  1178. m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
  1179. }
  1180. m.InitialPeerURLsMap[m.Name] = m.PeerURLs
  1181. m.NewCluster = false
  1182. if err := m.Launch(); err != nil {
  1183. t.Fatal(err)
  1184. }
  1185. c.Members = append(c.Members, m)
  1186. c.waitMembersMatch(t)
  1187. }
  1188. // getMembers returns a list of members in cluster, in format of etcdserverpb.Member
  1189. func (c *ClusterV3) getMembers() []*pb.Member {
  1190. var mems []*pb.Member
  1191. for _, m := range c.Members {
  1192. mem := &pb.Member{
  1193. Name: m.Name,
  1194. PeerURLs: m.PeerURLs.StringSlice(),
  1195. ClientURLs: m.ClientURLs.StringSlice(),
  1196. IsLearner: m.isLearner,
  1197. }
  1198. mems = append(mems, mem)
  1199. }
  1200. return mems
  1201. }
  1202. // waitMembersMatch waits until v3rpc MemberList returns the 'same' members info as the
  1203. // local 'c.Members', which is the local recording of members in the testing cluster. With
  1204. // the exception that the local recording c.Members does not have info on Member.ID, which
  1205. // is generated when the member is been added to cluster.
  1206. //
  1207. // Note:
  1208. // A successful match means the Member.clientURLs are matched. This means member has already
  1209. // finished publishing its server attributes to cluster. Publishing attributes is a cluster-wide
  1210. // write request (in v2 server). Therefore, at this point, any raft log entries prior to this
  1211. // would have already been applied.
  1212. //
  1213. // If a new member was added to an existing cluster, at this point, it has finished publishing
  1214. // its own server attributes to the cluster. And therefore by the same argument, it has already
  1215. // applied the raft log entries (especially those of type raftpb.ConfChangeType). At this point,
  1216. // the new member has the correct view of the cluster configuration.
  1217. //
  1218. // Special note on learner member:
  1219. // Learner member is only added to a cluster via v3rpc MemberAdd API (as of v3.4). When starting
  1220. // the learner member, its initial view of the cluster created by peerURLs map does not have info
  1221. // on whether or not the new member itself is learner. But at this point, a successful match does
  1222. // indicate that the new learner member has applied the raftpb.ConfChangeAddLearnerNode entry
  1223. // which was used to add the learner itself to the cluster, and therefore it has the correct info
  1224. // on learner.
  1225. func (c *ClusterV3) waitMembersMatch(t testing.TB) {
  1226. wMembers := c.getMembers()
  1227. sort.Sort(SortableProtoMemberSliceByPeerURLs(wMembers))
  1228. cli := c.Client(0)
  1229. for {
  1230. resp, err := cli.MemberList(context.Background())
  1231. if err != nil {
  1232. t.Fatalf("failed to list member %v", err)
  1233. }
  1234. if len(resp.Members) != len(wMembers) {
  1235. continue
  1236. }
  1237. sort.Sort(SortableProtoMemberSliceByPeerURLs(resp.Members))
  1238. for _, m := range resp.Members {
  1239. m.ID = 0
  1240. }
  1241. if reflect.DeepEqual(resp.Members, wMembers) {
  1242. return
  1243. }
  1244. time.Sleep(tickDuration)
  1245. }
  1246. }
  1247. type SortableProtoMemberSliceByPeerURLs []*pb.Member
  1248. func (p SortableProtoMemberSliceByPeerURLs) Len() int { return len(p) }
  1249. func (p SortableProtoMemberSliceByPeerURLs) Less(i, j int) bool {
  1250. return p[i].PeerURLs[0] < p[j].PeerURLs[0]
  1251. }
  1252. func (p SortableProtoMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  1253. // MustNewMember creates a new member instance based on the response of V3 Member Add API.
  1254. func (c *ClusterV3) MustNewMember(t testing.TB, resp *clientv3.MemberAddResponse) *member {
  1255. m := c.mustNewMember(t)
  1256. m.isLearner = resp.Member.IsLearner
  1257. m.NewCluster = false
  1258. m.InitialPeerURLsMap = types.URLsMap{}
  1259. for _, mm := range c.Members {
  1260. m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
  1261. }
  1262. m.InitialPeerURLsMap[m.Name] = types.MustNewURLs(resp.Member.PeerURLs)
  1263. return m
  1264. }