cluster.go 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "fmt"
  19. "io/ioutil"
  20. "log"
  21. "math/rand"
  22. "net"
  23. "net/http"
  24. "net/http/httptest"
  25. "os"
  26. "reflect"
  27. "sort"
  28. "strings"
  29. "sync"
  30. "sync/atomic"
  31. "testing"
  32. "time"
  33. "go.etcd.io/etcd/client"
  34. "go.etcd.io/etcd/clientv3"
  35. "go.etcd.io/etcd/embed"
  36. "go.etcd.io/etcd/etcdserver"
  37. "go.etcd.io/etcd/etcdserver/api/etcdhttp"
  38. "go.etcd.io/etcd/etcdserver/api/rafthttp"
  39. "go.etcd.io/etcd/etcdserver/api/v2http"
  40. "go.etcd.io/etcd/etcdserver/api/v3client"
  41. "go.etcd.io/etcd/etcdserver/api/v3election"
  42. epb "go.etcd.io/etcd/etcdserver/api/v3election/v3electionpb"
  43. "go.etcd.io/etcd/etcdserver/api/v3lock"
  44. lockpb "go.etcd.io/etcd/etcdserver/api/v3lock/v3lockpb"
  45. "go.etcd.io/etcd/etcdserver/api/v3rpc"
  46. pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
  47. "go.etcd.io/etcd/pkg/logutil"
  48. "go.etcd.io/etcd/pkg/testutil"
  49. "go.etcd.io/etcd/pkg/tlsutil"
  50. "go.etcd.io/etcd/pkg/transport"
  51. "go.etcd.io/etcd/pkg/types"
  52. "github.com/soheilhy/cmux"
  53. "go.uber.org/zap"
  54. "golang.org/x/crypto/bcrypt"
  55. "google.golang.org/grpc"
  56. "google.golang.org/grpc/grpclog"
  57. "google.golang.org/grpc/keepalive"
  58. )
  59. const (
  60. // RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss.
  61. RequestWaitTimeout = 3 * time.Second
  62. tickDuration = 10 * time.Millisecond
  63. requestTimeout = 20 * time.Second
  64. clusterName = "etcd"
  65. basePort = 21000
  66. URLScheme = "unix"
  67. URLSchemeTLS = "unixs"
  68. )
  69. var (
  70. electionTicks = 10
  71. // integration test uses unique ports, counting up, to listen for each
  72. // member, ensuring restarted members can listen on the same port again.
  73. localListenCount = int64(0)
  74. testTLSInfo = transport.TLSInfo{
  75. KeyFile: "./fixtures/server.key.insecure",
  76. CertFile: "./fixtures/server.crt",
  77. TrustedCAFile: "./fixtures/ca.crt",
  78. ClientCertAuth: true,
  79. }
  80. testTLSInfoIP = transport.TLSInfo{
  81. KeyFile: "./fixtures/server-ip.key.insecure",
  82. CertFile: "./fixtures/server-ip.crt",
  83. TrustedCAFile: "./fixtures/ca.crt",
  84. ClientCertAuth: true,
  85. }
  86. testTLSInfoExpired = transport.TLSInfo{
  87. KeyFile: "./fixtures-expired/server.key.insecure",
  88. CertFile: "./fixtures-expired/server.crt",
  89. TrustedCAFile: "./fixtures-expired/ca.crt",
  90. ClientCertAuth: true,
  91. }
  92. testTLSInfoExpiredIP = transport.TLSInfo{
  93. KeyFile: "./fixtures-expired/server-ip.key.insecure",
  94. CertFile: "./fixtures-expired/server-ip.crt",
  95. TrustedCAFile: "./fixtures-expired/ca.crt",
  96. ClientCertAuth: true,
  97. }
  98. defaultTokenJWT = "jwt,pub-key=./fixtures/server.crt,priv-key=./fixtures/server.key.insecure,sign-method=RS256,ttl=1s"
  99. lg = zap.NewNop()
  100. )
  101. func init() {
  102. if os.Getenv("CLUSTER_DEBUG") != "" {
  103. lg, _ = zap.NewProduction()
  104. }
  105. }
  106. type ClusterConfig struct {
  107. Size int
  108. PeerTLS *transport.TLSInfo
  109. ClientTLS *transport.TLSInfo
  110. DiscoveryURL string
  111. AuthToken string
  112. UseGRPC bool
  113. QuotaBackendBytes int64
  114. MaxTxnOps uint
  115. MaxRequestBytes uint
  116. SnapshotCount uint64
  117. SnapshotCatchUpEntries uint64
  118. GRPCKeepAliveMinTime time.Duration
  119. GRPCKeepAliveInterval time.Duration
  120. GRPCKeepAliveTimeout time.Duration
  121. // SkipCreatingClient to skip creating clients for each member.
  122. SkipCreatingClient bool
  123. ClientMaxCallSendMsgSize int
  124. ClientMaxCallRecvMsgSize int
  125. // UseIP is true to use only IP for gRPC requests.
  126. UseIP bool
  127. LeaseCheckpointInterval time.Duration
  128. }
  129. type cluster struct {
  130. cfg *ClusterConfig
  131. Members []*member
  132. }
  133. func schemeFromTLSInfo(tls *transport.TLSInfo) string {
  134. if tls == nil {
  135. return URLScheme
  136. }
  137. return URLSchemeTLS
  138. }
  139. func (c *cluster) fillClusterForMembers() error {
  140. if c.cfg.DiscoveryURL != "" {
  141. // cluster will be discovered
  142. return nil
  143. }
  144. addrs := make([]string, 0)
  145. for _, m := range c.Members {
  146. scheme := schemeFromTLSInfo(m.PeerTLSInfo)
  147. for _, l := range m.PeerListeners {
  148. addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String()))
  149. }
  150. }
  151. clusterStr := strings.Join(addrs, ",")
  152. var err error
  153. for _, m := range c.Members {
  154. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  155. if err != nil {
  156. return err
  157. }
  158. }
  159. return nil
  160. }
  161. func newCluster(t testing.TB, cfg *ClusterConfig) *cluster {
  162. c := &cluster{cfg: cfg}
  163. ms := make([]*member, cfg.Size)
  164. for i := 0; i < cfg.Size; i++ {
  165. ms[i] = c.mustNewMember(t)
  166. }
  167. c.Members = ms
  168. if err := c.fillClusterForMembers(); err != nil {
  169. t.Fatal(err)
  170. }
  171. return c
  172. }
  173. // NewCluster returns an unlaunched cluster of the given size which has been
  174. // set to use static bootstrap.
  175. func NewCluster(t testing.TB, size int) *cluster {
  176. return newCluster(t, &ClusterConfig{Size: size})
  177. }
  178. // NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration
  179. func NewClusterByConfig(t testing.TB, cfg *ClusterConfig) *cluster {
  180. return newCluster(t, cfg)
  181. }
  182. func (c *cluster) Launch(t testing.TB) {
  183. errc := make(chan error)
  184. for _, m := range c.Members {
  185. // Members are launched in separate goroutines because if they boot
  186. // using discovery url, they have to wait for others to register to continue.
  187. go func(m *member) {
  188. errc <- m.Launch()
  189. }(m)
  190. }
  191. for range c.Members {
  192. if err := <-errc; err != nil {
  193. t.Fatalf("error setting up member: %v", err)
  194. }
  195. }
  196. // wait cluster to be stable to receive future client requests
  197. c.waitMembersMatch(t, c.HTTPMembers())
  198. c.waitVersion()
  199. }
  200. func (c *cluster) URL(i int) string {
  201. return c.Members[i].ClientURLs[0].String()
  202. }
  203. // URLs returns a list of all active client URLs in the cluster
  204. func (c *cluster) URLs() []string {
  205. return getMembersURLs(c.Members)
  206. }
  207. func getMembersURLs(members []*member) []string {
  208. urls := make([]string, 0)
  209. for _, m := range members {
  210. select {
  211. case <-m.s.StopNotify():
  212. continue
  213. default:
  214. }
  215. for _, u := range m.ClientURLs {
  216. urls = append(urls, u.String())
  217. }
  218. }
  219. return urls
  220. }
  221. // HTTPMembers returns a list of all active members as client.Members
  222. func (c *cluster) HTTPMembers() []client.Member {
  223. ms := []client.Member{}
  224. for _, m := range c.Members {
  225. pScheme := schemeFromTLSInfo(m.PeerTLSInfo)
  226. cScheme := schemeFromTLSInfo(m.ClientTLSInfo)
  227. cm := client.Member{Name: m.Name}
  228. for _, ln := range m.PeerListeners {
  229. cm.PeerURLs = append(cm.PeerURLs, pScheme+"://"+ln.Addr().String())
  230. }
  231. for _, ln := range m.ClientListeners {
  232. cm.ClientURLs = append(cm.ClientURLs, cScheme+"://"+ln.Addr().String())
  233. }
  234. ms = append(ms, cm)
  235. }
  236. return ms
  237. }
  238. func (c *cluster) mustNewMember(t testing.TB) *member {
  239. m := mustNewMember(t,
  240. memberConfig{
  241. name: c.name(rand.Int()),
  242. authToken: c.cfg.AuthToken,
  243. peerTLS: c.cfg.PeerTLS,
  244. clientTLS: c.cfg.ClientTLS,
  245. quotaBackendBytes: c.cfg.QuotaBackendBytes,
  246. maxTxnOps: c.cfg.MaxTxnOps,
  247. maxRequestBytes: c.cfg.MaxRequestBytes,
  248. snapshotCount: c.cfg.SnapshotCount,
  249. snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries,
  250. grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime,
  251. grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
  252. grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout,
  253. clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
  254. clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
  255. useIP: c.cfg.UseIP,
  256. leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
  257. })
  258. m.DiscoveryURL = c.cfg.DiscoveryURL
  259. if c.cfg.UseGRPC {
  260. if err := m.listenGRPC(); err != nil {
  261. t.Fatal(err)
  262. }
  263. }
  264. return m
  265. }
  266. func (c *cluster) addMember(t testing.TB) {
  267. m := c.mustNewMember(t)
  268. scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
  269. // send add request to the cluster
  270. var err error
  271. for i := 0; i < len(c.Members); i++ {
  272. clientURL := c.URL(i)
  273. peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
  274. if err = c.addMemberByURL(t, clientURL, peerURL); err == nil {
  275. break
  276. }
  277. }
  278. if err != nil {
  279. t.Fatalf("add member failed on all members error: %v", err)
  280. }
  281. m.InitialPeerURLsMap = types.URLsMap{}
  282. for _, mm := range c.Members {
  283. m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
  284. }
  285. m.InitialPeerURLsMap[m.Name] = m.PeerURLs
  286. m.NewCluster = false
  287. if err := m.Launch(); err != nil {
  288. t.Fatal(err)
  289. }
  290. c.Members = append(c.Members, m)
  291. // wait cluster to be stable to receive future client requests
  292. c.waitMembersMatch(t, c.HTTPMembers())
  293. }
  294. func (c *cluster) addMemberByURL(t testing.TB, clientURL, peerURL string) error {
  295. cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
  296. ma := client.NewMembersAPI(cc)
  297. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  298. _, err := ma.Add(ctx, peerURL)
  299. cancel()
  300. if err != nil {
  301. return err
  302. }
  303. // wait for the add node entry applied in the cluster
  304. members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
  305. c.waitMembersMatch(t, members)
  306. return nil
  307. }
  308. func (c *cluster) AddMember(t testing.TB) {
  309. c.addMember(t)
  310. }
  311. func (c *cluster) RemoveMember(t testing.TB, id uint64) {
  312. if err := c.removeMember(t, id); err != nil {
  313. t.Fatal(err)
  314. }
  315. }
  316. func (c *cluster) removeMember(t testing.TB, id uint64) error {
  317. // send remove request to the cluster
  318. cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
  319. ma := client.NewMembersAPI(cc)
  320. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  321. err := ma.Remove(ctx, types.ID(id).String())
  322. cancel()
  323. if err != nil {
  324. return err
  325. }
  326. newMembers := make([]*member, 0)
  327. for _, m := range c.Members {
  328. if uint64(m.s.ID()) != id {
  329. newMembers = append(newMembers, m)
  330. } else {
  331. select {
  332. case <-m.s.StopNotify():
  333. m.Terminate(t)
  334. // 1s stop delay + election timeout + 1s disk and network delay + connection write timeout
  335. // TODO: remove connection write timeout by selecting on http response closeNotifier
  336. // blocking on https://github.com/golang/go/issues/9524
  337. case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout):
  338. t.Fatalf("failed to remove member %s in time", m.s.ID())
  339. }
  340. }
  341. }
  342. c.Members = newMembers
  343. c.waitMembersMatch(t, c.HTTPMembers())
  344. return nil
  345. }
  346. func (c *cluster) Terminate(t testing.TB) {
  347. var wg sync.WaitGroup
  348. wg.Add(len(c.Members))
  349. for _, m := range c.Members {
  350. go func(mm *member) {
  351. defer wg.Done()
  352. mm.Terminate(t)
  353. }(m)
  354. }
  355. wg.Wait()
  356. }
  357. func (c *cluster) waitMembersMatch(t testing.TB, membs []client.Member) {
  358. for _, u := range c.URLs() {
  359. cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
  360. ma := client.NewMembersAPI(cc)
  361. for {
  362. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  363. ms, err := ma.List(ctx)
  364. cancel()
  365. if err == nil && isMembersEqual(ms, membs) {
  366. break
  367. }
  368. time.Sleep(tickDuration)
  369. }
  370. }
  371. }
  372. func (c *cluster) WaitLeader(t testing.TB) int { return c.waitLeader(t, c.Members) }
  373. // waitLeader waits until given members agree on the same leader.
  374. func (c *cluster) waitLeader(t testing.TB, membs []*member) int {
  375. possibleLead := make(map[uint64]bool)
  376. var lead uint64
  377. for _, m := range membs {
  378. possibleLead[uint64(m.s.ID())] = true
  379. }
  380. cc := MustNewHTTPClient(t, getMembersURLs(membs), nil)
  381. kapi := client.NewKeysAPI(cc)
  382. // ensure leader is up via linearizable get
  383. for {
  384. ctx, cancel := context.WithTimeout(context.Background(), 10*tickDuration+time.Second)
  385. _, err := kapi.Get(ctx, "0", &client.GetOptions{Quorum: true})
  386. cancel()
  387. if err == nil || strings.Contains(err.Error(), "Key not found") {
  388. break
  389. }
  390. }
  391. for lead == 0 || !possibleLead[lead] {
  392. lead = 0
  393. for _, m := range membs {
  394. select {
  395. case <-m.s.StopNotify():
  396. continue
  397. default:
  398. }
  399. if lead != 0 && lead != m.s.Lead() {
  400. lead = 0
  401. time.Sleep(10 * tickDuration)
  402. break
  403. }
  404. lead = m.s.Lead()
  405. }
  406. }
  407. for i, m := range membs {
  408. if uint64(m.s.ID()) == lead {
  409. return i
  410. }
  411. }
  412. return -1
  413. }
  414. func (c *cluster) WaitNoLeader() { c.waitNoLeader(c.Members) }
  415. // waitNoLeader waits until given members lose leader.
  416. func (c *cluster) waitNoLeader(membs []*member) {
  417. noLeader := false
  418. for !noLeader {
  419. noLeader = true
  420. for _, m := range membs {
  421. select {
  422. case <-m.s.StopNotify():
  423. continue
  424. default:
  425. }
  426. if m.s.Lead() != 0 {
  427. noLeader = false
  428. time.Sleep(10 * tickDuration)
  429. break
  430. }
  431. }
  432. }
  433. }
  434. func (c *cluster) waitVersion() {
  435. for _, m := range c.Members {
  436. for {
  437. if m.s.ClusterVersion() != nil {
  438. break
  439. }
  440. time.Sleep(tickDuration)
  441. }
  442. }
  443. }
  444. func (c *cluster) name(i int) string {
  445. return fmt.Sprint(i)
  446. }
  447. // isMembersEqual checks whether two members equal except ID field.
  448. // The given wmembs should always set ID field to empty string.
  449. func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
  450. sort.Sort(SortableMemberSliceByPeerURLs(membs))
  451. sort.Sort(SortableMemberSliceByPeerURLs(wmembs))
  452. for i := range membs {
  453. membs[i].ID = ""
  454. }
  455. return reflect.DeepEqual(membs, wmembs)
  456. }
  457. func newLocalListener(t testing.TB) net.Listener {
  458. c := atomic.AddInt64(&localListenCount, 1)
  459. // Go 1.8+ allows only numbers in port
  460. addr := fmt.Sprintf("127.0.0.1:%05d%05d", c+basePort, os.Getpid())
  461. return NewListenerWithAddr(t, addr)
  462. }
  463. func NewListenerWithAddr(t testing.TB, addr string) net.Listener {
  464. l, err := transport.NewUnixListener(addr)
  465. if err != nil {
  466. t.Fatal(err)
  467. }
  468. return l
  469. }
  470. type member struct {
  471. etcdserver.ServerConfig
  472. PeerListeners, ClientListeners []net.Listener
  473. grpcListener net.Listener
  474. // PeerTLSInfo enables peer TLS when set
  475. PeerTLSInfo *transport.TLSInfo
  476. // ClientTLSInfo enables client TLS when set
  477. ClientTLSInfo *transport.TLSInfo
  478. DialOptions []grpc.DialOption
  479. raftHandler *testutil.PauseableHandler
  480. s *etcdserver.EtcdServer
  481. serverClosers []func()
  482. grpcServerOpts []grpc.ServerOption
  483. grpcServer *grpc.Server
  484. grpcServerPeer *grpc.Server
  485. grpcAddr string
  486. grpcBridge *bridge
  487. // serverClient is a clientv3 that directly calls the etcdserver.
  488. serverClient *clientv3.Client
  489. keepDataDirTerminate bool
  490. clientMaxCallSendMsgSize int
  491. clientMaxCallRecvMsgSize int
  492. useIP bool
  493. isLearner bool
  494. }
  495. func (m *member) GRPCAddr() string { return m.grpcAddr }
  496. type memberConfig struct {
  497. name string
  498. peerTLS *transport.TLSInfo
  499. clientTLS *transport.TLSInfo
  500. authToken string
  501. quotaBackendBytes int64
  502. maxTxnOps uint
  503. maxRequestBytes uint
  504. snapshotCount uint64
  505. snapshotCatchUpEntries uint64
  506. grpcKeepAliveMinTime time.Duration
  507. grpcKeepAliveInterval time.Duration
  508. grpcKeepAliveTimeout time.Duration
  509. clientMaxCallSendMsgSize int
  510. clientMaxCallRecvMsgSize int
  511. useIP bool
  512. leaseCheckpointInterval time.Duration
  513. }
  514. // mustNewMember return an inited member with the given name. If peerTLS is
  515. // set, it will use https scheme to communicate between peers.
  516. func mustNewMember(t testing.TB, mcfg memberConfig) *member {
  517. var err error
  518. m := &member{}
  519. peerScheme := schemeFromTLSInfo(mcfg.peerTLS)
  520. clientScheme := schemeFromTLSInfo(mcfg.clientTLS)
  521. pln := newLocalListener(t)
  522. m.PeerListeners = []net.Listener{pln}
  523. m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()})
  524. if err != nil {
  525. t.Fatal(err)
  526. }
  527. m.PeerTLSInfo = mcfg.peerTLS
  528. cln := newLocalListener(t)
  529. m.ClientListeners = []net.Listener{cln}
  530. m.ClientURLs, err = types.NewURLs([]string{clientScheme + "://" + cln.Addr().String()})
  531. if err != nil {
  532. t.Fatal(err)
  533. }
  534. m.ClientTLSInfo = mcfg.clientTLS
  535. m.Name = mcfg.name
  536. m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
  537. if err != nil {
  538. t.Fatal(err)
  539. }
  540. clusterStr := fmt.Sprintf("%s=%s://%s", mcfg.name, peerScheme, pln.Addr().String())
  541. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  542. if err != nil {
  543. t.Fatal(err)
  544. }
  545. m.InitialClusterToken = clusterName
  546. m.NewCluster = true
  547. m.BootstrapTimeout = 10 * time.Millisecond
  548. if m.PeerTLSInfo != nil {
  549. m.ServerConfig.PeerTLSInfo = *m.PeerTLSInfo
  550. }
  551. m.ElectionTicks = electionTicks
  552. m.InitialElectionTickAdvance = true
  553. m.TickMs = uint(tickDuration / time.Millisecond)
  554. m.QuotaBackendBytes = mcfg.quotaBackendBytes
  555. m.MaxTxnOps = mcfg.maxTxnOps
  556. if m.MaxTxnOps == 0 {
  557. m.MaxTxnOps = embed.DefaultMaxTxnOps
  558. }
  559. m.MaxRequestBytes = mcfg.maxRequestBytes
  560. if m.MaxRequestBytes == 0 {
  561. m.MaxRequestBytes = embed.DefaultMaxRequestBytes
  562. }
  563. m.SnapshotCount = etcdserver.DefaultSnapshotCount
  564. if mcfg.snapshotCount != 0 {
  565. m.SnapshotCount = mcfg.snapshotCount
  566. }
  567. m.SnapshotCatchUpEntries = etcdserver.DefaultSnapshotCatchUpEntries
  568. if mcfg.snapshotCatchUpEntries != 0 {
  569. m.SnapshotCatchUpEntries = mcfg.snapshotCatchUpEntries
  570. }
  571. // for the purpose of integration testing, simple token is enough
  572. m.AuthToken = "simple"
  573. if mcfg.authToken != "" {
  574. m.AuthToken = mcfg.authToken
  575. }
  576. m.BcryptCost = uint(bcrypt.MinCost) // use min bcrypt cost to speedy up integration testing
  577. m.grpcServerOpts = []grpc.ServerOption{}
  578. if mcfg.grpcKeepAliveMinTime > time.Duration(0) {
  579. m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
  580. MinTime: mcfg.grpcKeepAliveMinTime,
  581. PermitWithoutStream: false,
  582. }))
  583. }
  584. if mcfg.grpcKeepAliveInterval > time.Duration(0) &&
  585. mcfg.grpcKeepAliveTimeout > time.Duration(0) {
  586. m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveParams(keepalive.ServerParameters{
  587. Time: mcfg.grpcKeepAliveInterval,
  588. Timeout: mcfg.grpcKeepAliveTimeout,
  589. }))
  590. }
  591. m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize
  592. m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize
  593. m.useIP = mcfg.useIP
  594. m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
  595. m.InitialCorruptCheck = true
  596. lcfg := logutil.DefaultZapLoggerConfig
  597. m.LoggerConfig = &lcfg
  598. m.LoggerConfig.OutputPaths = []string{"/dev/null"}
  599. m.LoggerConfig.ErrorOutputPaths = []string{"/dev/null"}
  600. if os.Getenv("CLUSTER_DEBUG") != "" {
  601. m.LoggerConfig.OutputPaths = []string{"stderr"}
  602. m.LoggerConfig.ErrorOutputPaths = []string{"stderr"}
  603. }
  604. m.Logger, err = m.LoggerConfig.Build()
  605. if err != nil {
  606. t.Fatal(err)
  607. }
  608. return m
  609. }
  610. // listenGRPC starts a grpc server over a unix domain socket on the member
  611. func (m *member) listenGRPC() error {
  612. // prefix with localhost so cert has right domain
  613. m.grpcAddr = "localhost:" + m.Name
  614. if m.useIP { // for IP-only TLS certs
  615. m.grpcAddr = "127.0.0.1:" + m.Name
  616. }
  617. l, err := transport.NewUnixListener(m.grpcAddr)
  618. if err != nil {
  619. return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
  620. }
  621. m.grpcBridge, err = newBridge(m.grpcAddr)
  622. if err != nil {
  623. l.Close()
  624. return err
  625. }
  626. m.grpcAddr = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + m.grpcBridge.inaddr
  627. m.grpcListener = l
  628. return nil
  629. }
  630. func (m *member) ElectionTimeout() time.Duration {
  631. return time.Duration(m.s.Cfg.ElectionTicks*int(m.s.Cfg.TickMs)) * time.Millisecond
  632. }
  633. func (m *member) ID() types.ID { return m.s.ID() }
  634. func (m *member) DropConnections() { m.grpcBridge.Reset() }
  635. func (m *member) PauseConnections() { m.grpcBridge.Pause() }
  636. func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
  637. func (m *member) Blackhole() { m.grpcBridge.Blackhole() }
  638. func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() }
  639. // NewClientV3 creates a new grpc client connection to the member
  640. func NewClientV3(m *member) (*clientv3.Client, error) {
  641. if m.grpcAddr == "" {
  642. return nil, fmt.Errorf("member not configured for grpc")
  643. }
  644. cfg := clientv3.Config{
  645. Endpoints: []string{m.grpcAddr},
  646. DialTimeout: 5 * time.Second,
  647. DialOptions: []grpc.DialOption{grpc.WithBlock()},
  648. MaxCallSendMsgSize: m.clientMaxCallSendMsgSize,
  649. MaxCallRecvMsgSize: m.clientMaxCallRecvMsgSize,
  650. }
  651. if m.ClientTLSInfo != nil {
  652. tls, err := m.ClientTLSInfo.ClientConfig()
  653. if err != nil {
  654. return nil, err
  655. }
  656. cfg.TLS = tls
  657. }
  658. if m.DialOptions != nil {
  659. cfg.DialOptions = append(cfg.DialOptions, m.DialOptions...)
  660. }
  661. return newClientV3(cfg)
  662. }
  663. // Clone returns a member with the same server configuration. The returned
  664. // member will not set PeerListeners and ClientListeners.
  665. func (m *member) Clone(t testing.TB) *member {
  666. mm := &member{}
  667. mm.ServerConfig = m.ServerConfig
  668. var err error
  669. clientURLStrs := m.ClientURLs.StringSlice()
  670. mm.ClientURLs, err = types.NewURLs(clientURLStrs)
  671. if err != nil {
  672. // this should never fail
  673. panic(err)
  674. }
  675. peerURLStrs := m.PeerURLs.StringSlice()
  676. mm.PeerURLs, err = types.NewURLs(peerURLStrs)
  677. if err != nil {
  678. // this should never fail
  679. panic(err)
  680. }
  681. clusterStr := m.InitialPeerURLsMap.String()
  682. mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  683. if err != nil {
  684. // this should never fail
  685. panic(err)
  686. }
  687. mm.InitialClusterToken = m.InitialClusterToken
  688. mm.ElectionTicks = m.ElectionTicks
  689. mm.PeerTLSInfo = m.PeerTLSInfo
  690. mm.ClientTLSInfo = m.ClientTLSInfo
  691. return mm
  692. }
  693. // Launch starts a member based on ServerConfig, PeerListeners
  694. // and ClientListeners.
  695. func (m *member) Launch() error {
  696. lg.Info(
  697. "launching a member",
  698. zap.String("name", m.Name),
  699. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  700. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  701. zap.String("grpc-address", m.grpcAddr),
  702. )
  703. var err error
  704. if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
  705. return fmt.Errorf("failed to initialize the etcd server: %v", err)
  706. }
  707. m.s.SyncTicker = time.NewTicker(500 * time.Millisecond)
  708. m.s.Start()
  709. var peerTLScfg *tls.Config
  710. if m.PeerTLSInfo != nil && !m.PeerTLSInfo.Empty() {
  711. if peerTLScfg, err = m.PeerTLSInfo.ServerConfig(); err != nil {
  712. return err
  713. }
  714. }
  715. if m.grpcListener != nil {
  716. var (
  717. tlscfg *tls.Config
  718. )
  719. if m.ClientTLSInfo != nil && !m.ClientTLSInfo.Empty() {
  720. tlscfg, err = m.ClientTLSInfo.ServerConfig()
  721. if err != nil {
  722. return err
  723. }
  724. }
  725. m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...)
  726. m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg)
  727. m.serverClient = v3client.New(m.s)
  728. lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient))
  729. epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient))
  730. go m.grpcServer.Serve(m.grpcListener)
  731. }
  732. m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.Logger, m.s)}
  733. h := (http.Handler)(m.raftHandler)
  734. if m.grpcListener != nil {
  735. h = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  736. if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
  737. m.grpcServerPeer.ServeHTTP(w, r)
  738. } else {
  739. m.raftHandler.ServeHTTP(w, r)
  740. }
  741. })
  742. }
  743. for _, ln := range m.PeerListeners {
  744. cm := cmux.New(ln)
  745. // don't hang on matcher after closing listener
  746. cm.SetReadTimeout(time.Second)
  747. if m.grpcServer != nil {
  748. grpcl := cm.Match(cmux.HTTP2())
  749. go m.grpcServerPeer.Serve(grpcl)
  750. }
  751. // serve http1/http2 rafthttp/grpc
  752. ll := cm.Match(cmux.Any())
  753. if peerTLScfg != nil {
  754. if ll, err = transport.NewTLSListener(ll, m.PeerTLSInfo); err != nil {
  755. return err
  756. }
  757. }
  758. hs := &httptest.Server{
  759. Listener: ll,
  760. Config: &http.Server{
  761. Handler: h,
  762. TLSConfig: peerTLScfg,
  763. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  764. },
  765. TLS: peerTLScfg,
  766. }
  767. hs.Start()
  768. donec := make(chan struct{})
  769. go func() {
  770. defer close(donec)
  771. cm.Serve()
  772. }()
  773. closer := func() {
  774. ll.Close()
  775. hs.CloseClientConnections()
  776. hs.Close()
  777. <-donec
  778. }
  779. m.serverClosers = append(m.serverClosers, closer)
  780. }
  781. for _, ln := range m.ClientListeners {
  782. hs := &httptest.Server{
  783. Listener: ln,
  784. Config: &http.Server{
  785. Handler: v2http.NewClientHandler(
  786. m.Logger,
  787. m.s,
  788. m.ServerConfig.ReqTimeout(),
  789. ),
  790. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  791. },
  792. }
  793. if m.ClientTLSInfo == nil {
  794. hs.Start()
  795. } else {
  796. info := m.ClientTLSInfo
  797. hs.TLS, err = info.ServerConfig()
  798. if err != nil {
  799. return err
  800. }
  801. // baseConfig is called on initial TLS handshake start.
  802. //
  803. // Previously,
  804. // 1. Server has non-empty (*tls.Config).Certificates on client hello
  805. // 2. Server calls (*tls.Config).GetCertificate iff:
  806. // - Server's (*tls.Config).Certificates is not empty, or
  807. // - Client supplies SNI; non-empty (*tls.ClientHelloInfo).ServerName
  808. //
  809. // When (*tls.Config).Certificates is always populated on initial handshake,
  810. // client is expected to provide a valid matching SNI to pass the TLS
  811. // verification, thus trigger server (*tls.Config).GetCertificate to reload
  812. // TLS assets. However, a cert whose SAN field does not include domain names
  813. // but only IP addresses, has empty (*tls.ClientHelloInfo).ServerName, thus
  814. // it was never able to trigger TLS reload on initial handshake; first
  815. // ceritifcate object was being used, never being updated.
  816. //
  817. // Now, (*tls.Config).Certificates is created empty on initial TLS client
  818. // handshake, in order to trigger (*tls.Config).GetCertificate and populate
  819. // rest of the certificates on every new TLS connection, even when client
  820. // SNI is empty (e.g. cert only includes IPs).
  821. //
  822. // This introduces another problem with "httptest.Server":
  823. // when server initial certificates are empty, certificates
  824. // are overwritten by Go's internal test certs, which have
  825. // different SAN fields (e.g. example.com). To work around,
  826. // re-overwrite (*tls.Config).Certificates before starting
  827. // test server.
  828. tlsCert, err := tlsutil.NewCert(info.CertFile, info.KeyFile, nil)
  829. if err != nil {
  830. return err
  831. }
  832. hs.TLS.Certificates = []tls.Certificate{*tlsCert}
  833. hs.StartTLS()
  834. }
  835. closer := func() {
  836. ln.Close()
  837. hs.CloseClientConnections()
  838. hs.Close()
  839. }
  840. m.serverClosers = append(m.serverClosers, closer)
  841. }
  842. lg.Info(
  843. "launched a member",
  844. zap.String("name", m.Name),
  845. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  846. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  847. zap.String("grpc-address", m.grpcAddr),
  848. )
  849. return nil
  850. }
  851. func (m *member) WaitOK(t testing.TB) {
  852. m.WaitStarted(t)
  853. for m.s.Leader() == 0 {
  854. time.Sleep(tickDuration)
  855. }
  856. }
  857. func (m *member) WaitStarted(t testing.TB) {
  858. cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
  859. kapi := client.NewKeysAPI(cc)
  860. for {
  861. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  862. _, err := kapi.Get(ctx, "/", nil)
  863. if err != nil {
  864. time.Sleep(tickDuration)
  865. continue
  866. }
  867. cancel()
  868. break
  869. }
  870. }
  871. func WaitClientV3(t testing.TB, kv clientv3.KV) {
  872. timeout := time.Now().Add(requestTimeout)
  873. var err error
  874. for time.Now().Before(timeout) {
  875. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  876. _, err = kv.Get(ctx, "/")
  877. cancel()
  878. if err == nil {
  879. return
  880. }
  881. time.Sleep(tickDuration)
  882. }
  883. if err != nil {
  884. t.Fatalf("timed out waiting for client: %v", err)
  885. }
  886. }
  887. func (m *member) URL() string { return m.ClientURLs[0].String() }
  888. func (m *member) Pause() {
  889. m.raftHandler.Pause()
  890. m.s.PauseSending()
  891. }
  892. func (m *member) Resume() {
  893. m.raftHandler.Resume()
  894. m.s.ResumeSending()
  895. }
  896. // Close stops the member's etcdserver and closes its connections
  897. func (m *member) Close() {
  898. if m.grpcBridge != nil {
  899. m.grpcBridge.Close()
  900. m.grpcBridge = nil
  901. }
  902. if m.serverClient != nil {
  903. m.serverClient.Close()
  904. m.serverClient = nil
  905. }
  906. if m.grpcServer != nil {
  907. m.grpcServer.Stop()
  908. m.grpcServer.GracefulStop()
  909. m.grpcServer = nil
  910. m.grpcServerPeer.Stop()
  911. m.grpcServerPeer.GracefulStop()
  912. m.grpcServerPeer = nil
  913. }
  914. m.s.HardStop()
  915. for _, f := range m.serverClosers {
  916. f()
  917. }
  918. }
  919. // Stop stops the member, but the data dir of the member is preserved.
  920. func (m *member) Stop(t testing.TB) {
  921. lg.Info(
  922. "stopping a member",
  923. zap.String("name", m.Name),
  924. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  925. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  926. zap.String("grpc-address", m.grpcAddr),
  927. )
  928. m.Close()
  929. m.serverClosers = nil
  930. lg.Info(
  931. "stopped a member",
  932. zap.String("name", m.Name),
  933. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  934. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  935. zap.String("grpc-address", m.grpcAddr),
  936. )
  937. }
  938. // checkLeaderTransition waits for leader transition, returning the new leader ID.
  939. func checkLeaderTransition(m *member, oldLead uint64) uint64 {
  940. interval := time.Duration(m.s.Cfg.TickMs) * time.Millisecond
  941. for m.s.Lead() == 0 || (m.s.Lead() == oldLead) {
  942. time.Sleep(interval)
  943. }
  944. return m.s.Lead()
  945. }
  946. // StopNotify unblocks when a member stop completes
  947. func (m *member) StopNotify() <-chan struct{} {
  948. return m.s.StopNotify()
  949. }
  950. // Restart starts the member using the preserved data dir.
  951. func (m *member) Restart(t testing.TB) error {
  952. lg.Info(
  953. "restarting a member",
  954. zap.String("name", m.Name),
  955. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  956. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  957. zap.String("grpc-address", m.grpcAddr),
  958. )
  959. newPeerListeners := make([]net.Listener, 0)
  960. for _, ln := range m.PeerListeners {
  961. newPeerListeners = append(newPeerListeners, NewListenerWithAddr(t, ln.Addr().String()))
  962. }
  963. m.PeerListeners = newPeerListeners
  964. newClientListeners := make([]net.Listener, 0)
  965. for _, ln := range m.ClientListeners {
  966. newClientListeners = append(newClientListeners, NewListenerWithAddr(t, ln.Addr().String()))
  967. }
  968. m.ClientListeners = newClientListeners
  969. if m.grpcListener != nil {
  970. if err := m.listenGRPC(); err != nil {
  971. t.Fatal(err)
  972. }
  973. }
  974. err := m.Launch()
  975. lg.Info(
  976. "restarted a member",
  977. zap.String("name", m.Name),
  978. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  979. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  980. zap.String("grpc-address", m.grpcAddr),
  981. zap.Error(err),
  982. )
  983. return err
  984. }
  985. // Terminate stops the member and removes the data dir.
  986. func (m *member) Terminate(t testing.TB) {
  987. lg.Info(
  988. "terminating a member",
  989. zap.String("name", m.Name),
  990. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  991. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  992. zap.String("grpc-address", m.grpcAddr),
  993. )
  994. m.Close()
  995. if !m.keepDataDirTerminate {
  996. if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
  997. t.Fatal(err)
  998. }
  999. }
  1000. lg.Info(
  1001. "terminated a member",
  1002. zap.String("name", m.Name),
  1003. zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
  1004. zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
  1005. zap.String("grpc-address", m.grpcAddr),
  1006. )
  1007. }
  1008. // Metric gets the metric value for a member
  1009. func (m *member) Metric(metricName string) (string, error) {
  1010. cfgtls := transport.TLSInfo{}
  1011. tr, err := transport.NewTimeoutTransport(cfgtls, time.Second, time.Second, time.Second)
  1012. if err != nil {
  1013. return "", err
  1014. }
  1015. cli := &http.Client{Transport: tr}
  1016. resp, err := cli.Get(m.ClientURLs[0].String() + "/metrics")
  1017. if err != nil {
  1018. return "", err
  1019. }
  1020. defer resp.Body.Close()
  1021. b, rerr := ioutil.ReadAll(resp.Body)
  1022. if rerr != nil {
  1023. return "", rerr
  1024. }
  1025. lines := strings.Split(string(b), "\n")
  1026. for _, l := range lines {
  1027. if strings.HasPrefix(l, metricName) {
  1028. return strings.Split(l, " ")[1], nil
  1029. }
  1030. }
  1031. return "", nil
  1032. }
  1033. // InjectPartition drops connections from m to others, vice versa.
  1034. func (m *member) InjectPartition(t testing.TB, others ...*member) {
  1035. for _, other := range others {
  1036. m.s.CutPeer(other.s.ID())
  1037. other.s.CutPeer(m.s.ID())
  1038. }
  1039. }
  1040. // RecoverPartition recovers connections from m to others, vice versa.
  1041. func (m *member) RecoverPartition(t testing.TB, others ...*member) {
  1042. for _, other := range others {
  1043. m.s.MendPeer(other.s.ID())
  1044. other.s.MendPeer(m.s.ID())
  1045. }
  1046. }
  1047. func (m *member) ReadyNotify() <-chan struct{} {
  1048. return m.s.ReadyNotify()
  1049. }
  1050. func MustNewHTTPClient(t testing.TB, eps []string, tls *transport.TLSInfo) client.Client {
  1051. cfgtls := transport.TLSInfo{}
  1052. if tls != nil {
  1053. cfgtls = *tls
  1054. }
  1055. cfg := client.Config{Transport: mustNewTransport(t, cfgtls), Endpoints: eps}
  1056. c, err := client.New(cfg)
  1057. if err != nil {
  1058. t.Fatal(err)
  1059. }
  1060. return c
  1061. }
  1062. func mustNewTransport(t testing.TB, tlsInfo transport.TLSInfo) *http.Transport {
  1063. // tick in integration test is short, so 1s dial timeout could play well.
  1064. tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
  1065. if err != nil {
  1066. t.Fatal(err)
  1067. }
  1068. return tr
  1069. }
  1070. type SortableMemberSliceByPeerURLs []client.Member
  1071. func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
  1072. func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {
  1073. return p[i].PeerURLs[0] < p[j].PeerURLs[0]
  1074. }
  1075. func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  1076. type ClusterV3 struct {
  1077. *cluster
  1078. mu sync.Mutex
  1079. clients []*clientv3.Client
  1080. }
  1081. // NewClusterV3 returns a launched cluster with a grpc client connection
  1082. // for each cluster member.
  1083. func NewClusterV3(t testing.TB, cfg *ClusterConfig) *ClusterV3 {
  1084. cfg.UseGRPC = true
  1085. if os.Getenv("CLIENT_DEBUG") != "" {
  1086. clientv3.SetLogger(grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 4))
  1087. }
  1088. clus := &ClusterV3{
  1089. cluster: NewClusterByConfig(t, cfg),
  1090. }
  1091. clus.Launch(t)
  1092. if !cfg.SkipCreatingClient {
  1093. for _, m := range clus.Members {
  1094. client, err := NewClientV3(m)
  1095. if err != nil {
  1096. t.Fatalf("cannot create client: %v", err)
  1097. }
  1098. clus.clients = append(clus.clients, client)
  1099. }
  1100. }
  1101. return clus
  1102. }
  1103. func (c *ClusterV3) TakeClient(idx int) {
  1104. c.mu.Lock()
  1105. c.clients[idx] = nil
  1106. c.mu.Unlock()
  1107. }
  1108. func (c *ClusterV3) Terminate(t testing.TB) {
  1109. c.mu.Lock()
  1110. for _, client := range c.clients {
  1111. if client == nil {
  1112. continue
  1113. }
  1114. if err := client.Close(); err != nil {
  1115. t.Error(err)
  1116. }
  1117. }
  1118. c.mu.Unlock()
  1119. c.cluster.Terminate(t)
  1120. }
  1121. func (c *ClusterV3) RandClient() *clientv3.Client {
  1122. return c.clients[rand.Intn(len(c.clients))]
  1123. }
  1124. func (c *ClusterV3) Client(i int) *clientv3.Client {
  1125. return c.clients[i]
  1126. }
  1127. type grpcAPI struct {
  1128. // Cluster is the cluster API for the client's connection.
  1129. Cluster pb.ClusterClient
  1130. // KV is the keyvalue API for the client's connection.
  1131. KV pb.KVClient
  1132. // Lease is the lease API for the client's connection.
  1133. Lease pb.LeaseClient
  1134. // Watch is the watch API for the client's connection.
  1135. Watch pb.WatchClient
  1136. // Maintenance is the maintenance API for the client's connection.
  1137. Maintenance pb.MaintenanceClient
  1138. // Auth is the authentication API for the client's connection.
  1139. Auth pb.AuthClient
  1140. // Lock is the lock API for the client's connection.
  1141. Lock lockpb.LockClient
  1142. // Election is the election API for the client's connection.
  1143. Election epb.ElectionClient
  1144. }
  1145. // GetLearnerMembers returns the list of learner members in cluster using MemberList API.
  1146. func (c *ClusterV3) GetLearnerMembers() ([]*pb.Member, error) {
  1147. cli := c.Client(0)
  1148. resp, err := cli.MemberList(context.Background())
  1149. if err != nil {
  1150. return nil, fmt.Errorf("failed to list member %v", err)
  1151. }
  1152. var learners []*pb.Member
  1153. for _, m := range resp.Members {
  1154. if m.IsLearner {
  1155. learners = append(learners, m)
  1156. }
  1157. }
  1158. return learners, nil
  1159. }
  1160. // AddAndLaunchLearnerMember creates a leaner member, adds it to cluster
  1161. // via v3 MemberAdd API, and then launches the new member.
  1162. func (c *ClusterV3) AddAndLaunchLearnerMember(t testing.TB) {
  1163. m := c.mustNewMember(t)
  1164. m.isLearner = true
  1165. scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
  1166. peerURLs := []string{scheme + "://" + m.PeerListeners[0].Addr().String()}
  1167. cli := c.Client(0)
  1168. _, err := cli.MemberAddAsLearner(context.Background(), peerURLs)
  1169. if err != nil {
  1170. t.Fatalf("failed to add learner member %v", err)
  1171. }
  1172. m.InitialPeerURLsMap = types.URLsMap{}
  1173. for _, mm := range c.Members {
  1174. m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
  1175. }
  1176. m.InitialPeerURLsMap[m.Name] = m.PeerURLs
  1177. m.NewCluster = false
  1178. if err := m.Launch(); err != nil {
  1179. t.Fatal(err)
  1180. }
  1181. c.Members = append(c.Members, m)
  1182. c.waitMembersMatch(t)
  1183. }
  1184. // getMembers returns a list of members in cluster, in format of etcdserverpb.Member
  1185. func (c *ClusterV3) getMembers() []*pb.Member {
  1186. var mems []*pb.Member
  1187. for _, m := range c.Members {
  1188. mem := &pb.Member{
  1189. Name: m.Name,
  1190. PeerURLs: m.PeerURLs.StringSlice(),
  1191. ClientURLs: m.ClientURLs.StringSlice(),
  1192. IsLearner: m.isLearner,
  1193. }
  1194. mems = append(mems, mem)
  1195. }
  1196. return mems
  1197. }
  1198. // waitMembersMatch waits until v3rpc MemberList returns the 'same' members info as the
  1199. // local 'c.Members', which is the local recording of members in the testing cluster. With
  1200. // the exception that the local recording c.Members does not have info on Member.ID, which
  1201. // is generated when the member is been added to cluster.
  1202. //
  1203. // Note:
  1204. // A successful match means the Member.clientURLs are matched. This means member has already
  1205. // finished publishing its server attributes to cluster. Publishing attributes is a cluster-wide
  1206. // write request (in v2 server). Therefore, at this point, any raft log entries prior to this
  1207. // would have already been applied.
  1208. //
  1209. // If a new member was added to an existing cluster, at this point, it has finished publishing
  1210. // its own server attributes to the cluster. And therefore by the same argument, it has already
  1211. // applied the raft log entries (especially those of type raftpb.ConfChangeType). At this point,
  1212. // the new member has the correct view of the cluster configuration.
  1213. //
  1214. // Special note on learner member:
  1215. // Learner member is only added to a cluster via v3rpc MemberAdd API (as of v3.4). When starting
  1216. // the learner member, its initial view of the cluster created by peerURLs map does not have info
  1217. // on whether or not the new member itself is learner. But at this point, a successful match does
  1218. // indicate that the new learner member has applied the raftpb.ConfChangeAddLearnerNode entry
  1219. // which was used to add the learner itself to the cluster, and therefore it has the correct info
  1220. // on learner.
  1221. func (c *ClusterV3) waitMembersMatch(t testing.TB) {
  1222. wMembers := c.getMembers()
  1223. sort.Sort(SortableProtoMemberSliceByPeerURLs(wMembers))
  1224. cli := c.Client(0)
  1225. for {
  1226. resp, err := cli.MemberList(context.Background())
  1227. if err != nil {
  1228. t.Fatalf("failed to list member %v", err)
  1229. }
  1230. if len(resp.Members) != len(wMembers) {
  1231. continue
  1232. }
  1233. sort.Sort(SortableProtoMemberSliceByPeerURLs(resp.Members))
  1234. for _, m := range resp.Members {
  1235. m.ID = 0
  1236. }
  1237. if reflect.DeepEqual(resp.Members, wMembers) {
  1238. return
  1239. }
  1240. time.Sleep(tickDuration)
  1241. }
  1242. }
  1243. type SortableProtoMemberSliceByPeerURLs []*pb.Member
  1244. func (p SortableProtoMemberSliceByPeerURLs) Len() int { return len(p) }
  1245. func (p SortableProtoMemberSliceByPeerURLs) Less(i, j int) bool {
  1246. return p[i].PeerURLs[0] < p[j].PeerURLs[0]
  1247. }
  1248. func (p SortableProtoMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  1249. // MustNewMember creates a new member instance based on the response of V3 Member Add API.
  1250. func (c *ClusterV3) MustNewMember(t testing.TB, resp *clientv3.MemberAddResponse) *member {
  1251. m := c.mustNewMember(t)
  1252. m.isLearner = resp.Member.IsLearner
  1253. m.NewCluster = false
  1254. m.InitialPeerURLsMap = types.URLsMap{}
  1255. for _, mm := range c.Members {
  1256. m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
  1257. }
  1258. m.InitialPeerURLsMap[m.Name] = types.MustNewURLs(resp.Member.PeerURLs)
  1259. return m
  1260. }