etcd_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net/url"
  19. "os"
  20. "strings"
  21. "github.com/coreos/etcd/etcdserver"
  22. "github.com/coreos/etcd/pkg/expect"
  23. "github.com/coreos/etcd/pkg/fileutil"
  24. )
  25. const etcdProcessBasePort = 20000
  26. var (
  27. binPath string
  28. ctlBinPath string
  29. certPath string
  30. privateKeyPath string
  31. caPath string
  32. )
  33. type clientConnType int
  34. const (
  35. clientNonTLS clientConnType = iota
  36. clientTLS
  37. clientTLSAndNonTLS
  38. )
  39. var (
  40. configNoTLS = etcdProcessClusterConfig{
  41. clusterSize: 3,
  42. proxySize: 0,
  43. initialToken: "new",
  44. }
  45. configAutoTLS = etcdProcessClusterConfig{
  46. clusterSize: 3,
  47. isPeerTLS: true,
  48. isPeerAutoTLS: true,
  49. initialToken: "new",
  50. }
  51. configTLS = etcdProcessClusterConfig{
  52. clusterSize: 3,
  53. proxySize: 0,
  54. clientTLS: clientTLS,
  55. isPeerTLS: true,
  56. initialToken: "new",
  57. }
  58. configClientTLS = etcdProcessClusterConfig{
  59. clusterSize: 3,
  60. proxySize: 0,
  61. clientTLS: clientTLS,
  62. initialToken: "new",
  63. }
  64. configClientBoth = etcdProcessClusterConfig{
  65. clusterSize: 1,
  66. proxySize: 0,
  67. clientTLS: clientTLSAndNonTLS,
  68. initialToken: "new",
  69. }
  70. configClientAutoTLS = etcdProcessClusterConfig{
  71. clusterSize: 1,
  72. proxySize: 0,
  73. isClientAutoTLS: true,
  74. clientTLS: clientTLS,
  75. initialToken: "new",
  76. }
  77. configPeerTLS = etcdProcessClusterConfig{
  78. clusterSize: 3,
  79. proxySize: 0,
  80. isPeerTLS: true,
  81. initialToken: "new",
  82. }
  83. configWithProxy = etcdProcessClusterConfig{
  84. clusterSize: 3,
  85. proxySize: 1,
  86. initialToken: "new",
  87. }
  88. configWithProxyTLS = etcdProcessClusterConfig{
  89. clusterSize: 3,
  90. proxySize: 1,
  91. clientTLS: clientTLS,
  92. isPeerTLS: true,
  93. initialToken: "new",
  94. }
  95. configWithProxyPeerTLS = etcdProcessClusterConfig{
  96. clusterSize: 3,
  97. proxySize: 1,
  98. isPeerTLS: true,
  99. initialToken: "new",
  100. }
  101. )
  102. func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
  103. ret := cfg
  104. ret.clusterSize = 1
  105. return &ret
  106. }
  107. type etcdProcessCluster struct {
  108. cfg *etcdProcessClusterConfig
  109. procs []*etcdProcess
  110. }
  111. type etcdProcess struct {
  112. cfg *etcdProcessConfig
  113. proc *expect.ExpectProcess
  114. donec chan struct{} // closed when Interact() terminates
  115. }
  116. type etcdProcessConfig struct {
  117. execPath string
  118. args []string
  119. dataDirPath string
  120. keepDataDir bool
  121. purl url.URL
  122. acurl string
  123. // additional url for tls connection when the etcd process
  124. // serves both http and https
  125. acurltls string
  126. acurlHost string
  127. isProxy bool
  128. }
  129. type etcdProcessClusterConfig struct {
  130. execPath string
  131. dataDirPath string
  132. keepDataDir bool
  133. clusterSize int
  134. baseScheme string
  135. basePort int
  136. proxySize int
  137. snapCount int // default is 10000
  138. clientTLS clientConnType
  139. clientCertAuthEnabled bool
  140. isPeerTLS bool
  141. isPeerAutoTLS bool
  142. isClientAutoTLS bool
  143. forceNewCluster bool
  144. initialToken string
  145. quotaBackendBytes int64
  146. noStrictReconfig bool
  147. }
  148. // newEtcdProcessCluster launches a new cluster from etcd processes, returning
  149. // a new etcdProcessCluster once all nodes are ready to accept client requests.
  150. func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
  151. etcdCfgs := cfg.etcdProcessConfigs()
  152. epc := &etcdProcessCluster{
  153. cfg: cfg,
  154. procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize),
  155. }
  156. // launch etcd processes
  157. for i := range etcdCfgs {
  158. proc, err := newEtcdProcess(etcdCfgs[i])
  159. if err != nil {
  160. epc.Close()
  161. return nil, err
  162. }
  163. epc.procs[i] = proc
  164. }
  165. return epc, epc.Start()
  166. }
  167. func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
  168. if !fileutil.Exist(cfg.execPath) {
  169. return nil, fmt.Errorf("could not find etcd binary")
  170. }
  171. if !cfg.keepDataDir {
  172. if err := os.RemoveAll(cfg.dataDirPath); err != nil {
  173. return nil, err
  174. }
  175. }
  176. child, err := spawnCmd(append([]string{cfg.execPath}, cfg.args...))
  177. if err != nil {
  178. return nil, err
  179. }
  180. return &etcdProcess{cfg: cfg, proc: child, donec: make(chan struct{})}, nil
  181. }
  182. func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
  183. binPath = binDir + "/etcd"
  184. ctlBinPath = binDir + "/etcdctl"
  185. certPath = certDir + "/server.crt"
  186. privateKeyPath = certDir + "/server.key.insecure"
  187. caPath = certDir + "/ca.crt"
  188. if cfg.basePort == 0 {
  189. cfg.basePort = etcdProcessBasePort
  190. }
  191. if cfg.execPath == "" {
  192. cfg.execPath = binPath
  193. }
  194. if cfg.snapCount == 0 {
  195. cfg.snapCount = etcdserver.DefaultSnapCount
  196. }
  197. clientScheme := "http"
  198. if cfg.clientTLS == clientTLS {
  199. clientScheme = "https"
  200. }
  201. peerScheme := cfg.baseScheme
  202. if peerScheme == "" {
  203. peerScheme = "http"
  204. }
  205. if cfg.isPeerTLS {
  206. peerScheme += "s"
  207. }
  208. etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize+cfg.proxySize)
  209. initialCluster := make([]string, cfg.clusterSize)
  210. for i := 0; i < cfg.clusterSize; i++ {
  211. var curls []string
  212. var curl, curltls string
  213. port := cfg.basePort + 2*i
  214. curlHost := fmt.Sprintf("localhost:%d", port)
  215. switch cfg.clientTLS {
  216. case clientNonTLS, clientTLS:
  217. curl = (&url.URL{Scheme: clientScheme, Host: curlHost}).String()
  218. curls = []string{curl}
  219. case clientTLSAndNonTLS:
  220. curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
  221. curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
  222. curls = []string{curl, curltls}
  223. }
  224. purl := url.URL{Scheme: peerScheme, Host: fmt.Sprintf("localhost:%d", port+1)}
  225. name := fmt.Sprintf("testname%d", i)
  226. dataDirPath := cfg.dataDirPath
  227. if cfg.dataDirPath == "" {
  228. var derr error
  229. dataDirPath, derr = ioutil.TempDir("", name+".etcd")
  230. if derr != nil {
  231. panic("could not get tempdir for datadir")
  232. }
  233. }
  234. initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
  235. args := []string{
  236. "--name", name,
  237. "--listen-client-urls", strings.Join(curls, ","),
  238. "--advertise-client-urls", strings.Join(curls, ","),
  239. "--listen-peer-urls", purl.String(),
  240. "--initial-advertise-peer-urls", purl.String(),
  241. "--initial-cluster-token", cfg.initialToken,
  242. "--data-dir", dataDirPath,
  243. "--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
  244. }
  245. if cfg.forceNewCluster {
  246. args = append(args, "--force-new-cluster")
  247. }
  248. if cfg.quotaBackendBytes > 0 {
  249. args = append(args,
  250. "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
  251. )
  252. }
  253. if cfg.noStrictReconfig {
  254. args = append(args, "--strict-reconfig-check=false")
  255. }
  256. args = append(args, cfg.tlsArgs()...)
  257. etcdCfgs[i] = &etcdProcessConfig{
  258. execPath: cfg.execPath,
  259. args: args,
  260. dataDirPath: dataDirPath,
  261. keepDataDir: cfg.keepDataDir,
  262. purl: purl,
  263. acurl: curl,
  264. acurltls: curltls,
  265. acurlHost: curlHost,
  266. }
  267. }
  268. for i := 0; i < cfg.proxySize; i++ {
  269. port := cfg.basePort + 2*cfg.clusterSize + i + 1
  270. curlHost := fmt.Sprintf("localhost:%d", port)
  271. curl := url.URL{Scheme: clientScheme, Host: curlHost}
  272. name := fmt.Sprintf("testname-proxy%d", i)
  273. dataDirPath, derr := ioutil.TempDir("", name+".etcd")
  274. if derr != nil {
  275. panic("could not get tempdir for datadir")
  276. }
  277. args := []string{
  278. "--name", name,
  279. "--proxy", "on",
  280. "--listen-client-urls", curl.String(),
  281. "--data-dir", dataDirPath,
  282. }
  283. args = append(args, cfg.tlsArgs()...)
  284. etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{
  285. execPath: cfg.execPath,
  286. args: args,
  287. dataDirPath: dataDirPath,
  288. keepDataDir: cfg.keepDataDir,
  289. acurl: curl.String(),
  290. acurlHost: curlHost,
  291. isProxy: true,
  292. }
  293. }
  294. initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
  295. for i := range etcdCfgs {
  296. etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
  297. }
  298. return etcdCfgs
  299. }
  300. func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
  301. if cfg.clientTLS != clientNonTLS {
  302. if cfg.isClientAutoTLS {
  303. args = append(args, "--auto-tls=true")
  304. } else {
  305. tlsClientArgs := []string{
  306. "--cert-file", certPath,
  307. "--key-file", privateKeyPath,
  308. "--ca-file", caPath,
  309. }
  310. args = append(args, tlsClientArgs...)
  311. if cfg.clientCertAuthEnabled {
  312. args = append(args, "--client-cert-auth")
  313. }
  314. }
  315. }
  316. if cfg.isPeerTLS {
  317. if cfg.isPeerAutoTLS {
  318. args = append(args, "--peer-auto-tls=true")
  319. } else {
  320. tlsPeerArgs := []string{
  321. "--peer-cert-file", certPath,
  322. "--peer-key-file", privateKeyPath,
  323. "--peer-ca-file", caPath,
  324. }
  325. args = append(args, tlsPeerArgs...)
  326. }
  327. }
  328. return args
  329. }
  330. func (epc *etcdProcessCluster) Start() (err error) {
  331. readyC := make(chan error, epc.cfg.clusterSize+epc.cfg.proxySize)
  332. for i := range epc.procs {
  333. go func(n int) { readyC <- epc.procs[n].waitReady() }(i)
  334. }
  335. for range epc.procs {
  336. if err := <-readyC; err != nil {
  337. epc.Close()
  338. return err
  339. }
  340. }
  341. return nil
  342. }
  343. func (epc *etcdProcessCluster) RestartAll() error {
  344. for i := range epc.procs {
  345. proc, err := newEtcdProcess(epc.procs[i].cfg)
  346. if err != nil {
  347. epc.Close()
  348. return err
  349. }
  350. epc.procs[i] = proc
  351. }
  352. return epc.Start()
  353. }
  354. func (epc *etcdProcessCluster) StopAll() (err error) {
  355. for _, p := range epc.procs {
  356. if p == nil {
  357. continue
  358. }
  359. if curErr := p.Stop(); curErr != nil {
  360. if err != nil {
  361. err = fmt.Errorf("%v; %v", err, curErr)
  362. } else {
  363. err = curErr
  364. }
  365. }
  366. }
  367. return err
  368. }
  369. func (epc *etcdProcessCluster) Close() error {
  370. err := epc.StopAll()
  371. for _, p := range epc.procs {
  372. os.RemoveAll(p.cfg.dataDirPath)
  373. }
  374. return err
  375. }
  376. func (ep *etcdProcess) Restart() error {
  377. newEp, err := newEtcdProcess(ep.cfg)
  378. if err != nil {
  379. ep.Stop()
  380. return err
  381. }
  382. *ep = *newEp
  383. if err = ep.waitReady(); err != nil {
  384. ep.Stop()
  385. return err
  386. }
  387. return nil
  388. }
  389. func (ep *etcdProcess) Stop() error {
  390. if ep == nil {
  391. return nil
  392. }
  393. if err := ep.proc.Stop(); err != nil {
  394. return err
  395. }
  396. <-ep.donec
  397. if ep.cfg.purl.Scheme == "unix" || ep.cfg.purl.Scheme == "unixs" {
  398. os.RemoveAll(ep.cfg.purl.Host)
  399. }
  400. return nil
  401. }
  402. func (ep *etcdProcess) waitReady() error {
  403. readyStrs := []string{"enabled capabilities for version", "published"}
  404. if ep.cfg.isProxy {
  405. readyStrs = []string{"httpproxy: endpoints found"}
  406. }
  407. c := 0
  408. matchSet := func(l string) bool {
  409. for _, s := range readyStrs {
  410. if strings.Contains(l, s) {
  411. c++
  412. break
  413. }
  414. }
  415. return c == len(readyStrs)
  416. }
  417. _, err := ep.proc.ExpectFunc(matchSet)
  418. close(ep.donec)
  419. return err
  420. }
  421. func spawnCmd(args []string) (*expect.ExpectProcess, error) {
  422. return expect.NewExpect(args[0], args[1:]...)
  423. }
  424. func spawnWithExpect(args []string, expected string) error {
  425. return spawnWithExpects(args, []string{expected}...)
  426. }
  427. func spawnWithExpects(args []string, xs ...string) error {
  428. proc, err := spawnCmd(args)
  429. if err != nil {
  430. return err
  431. }
  432. // process until either stdout or stderr contains
  433. // the expected string
  434. var (
  435. lines []string
  436. lineFunc = func(txt string) bool { return true }
  437. )
  438. for _, txt := range xs {
  439. for {
  440. l, err := proc.ExpectFunc(lineFunc)
  441. if err != nil {
  442. return fmt.Errorf("%v (expected %q, got %q)", err, txt, lines)
  443. }
  444. lines = append(lines, l)
  445. if strings.Contains(l, txt) {
  446. break
  447. }
  448. }
  449. }
  450. perr := proc.Close()
  451. if err != nil {
  452. return err
  453. }
  454. if len(xs) == 0 && proc.LineCount() != 0 { // expect no output
  455. return fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount())
  456. }
  457. return perr
  458. }
  459. // proxies returns only the proxy etcdProcess.
  460. func (epc *etcdProcessCluster) proxies() []*etcdProcess {
  461. return epc.procs[epc.cfg.clusterSize:]
  462. }
  463. func (epc *etcdProcessCluster) backends() []*etcdProcess {
  464. return epc.procs[:epc.cfg.clusterSize]
  465. }
  466. func (epc *etcdProcessCluster) endpoints() []string {
  467. eps := make([]string, epc.cfg.clusterSize)
  468. for i, ep := range epc.backends() {
  469. eps[i] = ep.cfg.acurl
  470. }
  471. return eps
  472. }
  473. func (epc *etcdProcessCluster) grpcEndpoints() []string {
  474. eps := make([]string, epc.cfg.clusterSize)
  475. for i, ep := range epc.backends() {
  476. eps[i] = ep.cfg.acurlHost
  477. }
  478. return eps
  479. }