etcd_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net/url"
  19. "os"
  20. "strings"
  21. "github.com/coreos/etcd/pkg/expect"
  22. "github.com/coreos/etcd/pkg/fileutil"
  23. )
  24. const (
  25. etcdProcessBasePort = 20000
  26. certPath = "../integration/fixtures/server.crt"
  27. privateKeyPath = "../integration/fixtures/server.key.insecure"
  28. caPath = "../integration/fixtures/ca.crt"
  29. )
  30. type clientConnType int
  31. const (
  32. clientNonTLS clientConnType = iota
  33. clientTLS
  34. clientTLSAndNonTLS
  35. )
  36. var (
  37. configNoTLS = etcdProcessClusterConfig{
  38. clusterSize: 3,
  39. proxySize: 0,
  40. initialToken: "new",
  41. }
  42. configAutoTLS = etcdProcessClusterConfig{
  43. clusterSize: 3,
  44. isPeerTLS: true,
  45. isPeerAutoTLS: true,
  46. initialToken: "new",
  47. }
  48. configTLS = etcdProcessClusterConfig{
  49. clusterSize: 3,
  50. proxySize: 0,
  51. clientTLS: clientTLS,
  52. isPeerTLS: true,
  53. initialToken: "new",
  54. }
  55. configClientTLS = etcdProcessClusterConfig{
  56. clusterSize: 3,
  57. proxySize: 0,
  58. clientTLS: clientTLS,
  59. initialToken: "new",
  60. }
  61. configClientBoth = etcdProcessClusterConfig{
  62. clusterSize: 1,
  63. proxySize: 0,
  64. clientTLS: clientTLSAndNonTLS,
  65. initialToken: "new",
  66. }
  67. configClientAutoTLS = etcdProcessClusterConfig{
  68. clusterSize: 1,
  69. proxySize: 0,
  70. isClientAutoTLS: true,
  71. clientTLS: clientTLS,
  72. initialToken: "new",
  73. }
  74. configPeerTLS = etcdProcessClusterConfig{
  75. clusterSize: 3,
  76. proxySize: 0,
  77. isPeerTLS: true,
  78. initialToken: "new",
  79. }
  80. configWithProxy = etcdProcessClusterConfig{
  81. clusterSize: 3,
  82. proxySize: 1,
  83. initialToken: "new",
  84. }
  85. configWithProxyTLS = etcdProcessClusterConfig{
  86. clusterSize: 3,
  87. proxySize: 1,
  88. clientTLS: clientTLS,
  89. isPeerTLS: true,
  90. initialToken: "new",
  91. }
  92. configWithProxyPeerTLS = etcdProcessClusterConfig{
  93. clusterSize: 3,
  94. proxySize: 1,
  95. isPeerTLS: true,
  96. initialToken: "new",
  97. }
  98. )
  99. func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
  100. ret := cfg
  101. ret.clusterSize = 1
  102. return &ret
  103. }
  104. type etcdProcessCluster struct {
  105. cfg *etcdProcessClusterConfig
  106. procs []*etcdProcess
  107. }
  108. type etcdProcess struct {
  109. cfg *etcdProcessConfig
  110. proc *expect.ExpectProcess
  111. donec chan struct{} // closed when Interact() terminates
  112. }
  113. type etcdProcessConfig struct {
  114. args []string
  115. dataDirPath string
  116. keepDataDir bool
  117. acurl string
  118. // additional url for tls connection when the etcd process
  119. // serves both http and https
  120. acurltls string
  121. isProxy bool
  122. }
  123. type etcdProcessClusterConfig struct {
  124. dataDirPathPrefix string
  125. keepDataDir bool
  126. clusterSize int
  127. basePort int
  128. proxySize int
  129. clientTLS clientConnType
  130. isPeerTLS bool
  131. isPeerAutoTLS bool
  132. isClientAutoTLS bool
  133. forceNewCluster bool
  134. initialToken string
  135. quotaBackendBytes int64
  136. }
  137. // newEtcdProcessCluster launches a new cluster from etcd processes, returning
  138. // a new etcdProcessCluster once all nodes are ready to accept client requests.
  139. func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
  140. etcdCfgs := cfg.etcdProcessConfigs()
  141. epc := &etcdProcessCluster{
  142. cfg: cfg,
  143. procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize),
  144. }
  145. // launch etcd processes
  146. for i := range etcdCfgs {
  147. proc, err := newEtcdProcess(etcdCfgs[i])
  148. if err != nil {
  149. epc.Close()
  150. return nil, err
  151. }
  152. epc.procs[i] = proc
  153. }
  154. // wait for cluster to start
  155. readyC := make(chan error, cfg.clusterSize+cfg.proxySize)
  156. readyStr := "enabled capabilities for version"
  157. for i := range etcdCfgs {
  158. go func(etcdp *etcdProcess) {
  159. rs := readyStr
  160. if etcdp.cfg.isProxy {
  161. // rs = "proxy: listening for client requests on"
  162. rs = "proxy: endpoints found"
  163. }
  164. _, err := etcdp.proc.Expect(rs)
  165. readyC <- err
  166. close(etcdp.donec)
  167. }(epc.procs[i])
  168. }
  169. for range etcdCfgs {
  170. if err := <-readyC; err != nil {
  171. epc.Close()
  172. return nil, err
  173. }
  174. }
  175. return epc, nil
  176. }
  177. func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
  178. if !fileutil.Exist("../bin/etcd") {
  179. return nil, fmt.Errorf("could not find etcd binary")
  180. }
  181. if !cfg.keepDataDir {
  182. if err := os.RemoveAll(cfg.dataDirPath); err != nil {
  183. return nil, err
  184. }
  185. }
  186. child, err := spawnCmd(append([]string{"../bin/etcd"}, cfg.args...))
  187. if err != nil {
  188. return nil, err
  189. }
  190. return &etcdProcess{cfg: cfg, proc: child, donec: make(chan struct{})}, nil
  191. }
  192. func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
  193. if cfg.basePort == 0 {
  194. cfg.basePort = etcdProcessBasePort
  195. }
  196. clientScheme := "http"
  197. if cfg.clientTLS == clientTLS {
  198. clientScheme = "https"
  199. }
  200. peerScheme := "http"
  201. if cfg.isPeerTLS {
  202. peerScheme = "https"
  203. }
  204. etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize+cfg.proxySize)
  205. initialCluster := make([]string, cfg.clusterSize)
  206. for i := 0; i < cfg.clusterSize; i++ {
  207. var curls []string
  208. var curl, curltls string
  209. port := cfg.basePort + 2*i
  210. switch cfg.clientTLS {
  211. case clientNonTLS, clientTLS:
  212. curl = (&url.URL{Scheme: clientScheme, Host: fmt.Sprintf("localhost:%d", port)}).String()
  213. curls = []string{curl}
  214. case clientTLSAndNonTLS:
  215. curl = (&url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port)}).String()
  216. curltls = (&url.URL{Scheme: "https", Host: fmt.Sprintf("localhost:%d", port)}).String()
  217. curls = []string{curl, curltls}
  218. }
  219. purl := url.URL{Scheme: peerScheme, Host: fmt.Sprintf("localhost:%d", port+1)}
  220. name := fmt.Sprintf("testname%d", i)
  221. var dataDirPath string
  222. if cfg.dataDirPathPrefix != "" {
  223. dataDirPath = fmt.Sprintf("%s%d.etcd", cfg.dataDirPathPrefix, i)
  224. } else {
  225. var derr error
  226. dataDirPath, derr = ioutil.TempDir("", name+".etcd")
  227. if derr != nil {
  228. panic("could not get tempdir for datadir")
  229. }
  230. }
  231. initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
  232. args := []string{
  233. "--name", name,
  234. "--listen-client-urls", strings.Join(curls, ","),
  235. "--advertise-client-urls", strings.Join(curls, ","),
  236. "--listen-peer-urls", purl.String(),
  237. "--initial-advertise-peer-urls", purl.String(),
  238. "--initial-cluster-token", cfg.initialToken,
  239. "--data-dir", dataDirPath,
  240. }
  241. if cfg.forceNewCluster {
  242. args = append(args, "--force-new-cluster")
  243. }
  244. if cfg.quotaBackendBytes > 0 {
  245. args = append(args,
  246. "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
  247. )
  248. }
  249. args = append(args, cfg.tlsArgs()...)
  250. etcdCfgs[i] = &etcdProcessConfig{
  251. args: args,
  252. dataDirPath: dataDirPath,
  253. keepDataDir: cfg.keepDataDir,
  254. acurl: curl,
  255. acurltls: curltls,
  256. }
  257. }
  258. for i := 0; i < cfg.proxySize; i++ {
  259. port := cfg.basePort + 2*cfg.clusterSize + i + 1
  260. curl := url.URL{Scheme: clientScheme, Host: fmt.Sprintf("localhost:%d", port)}
  261. name := fmt.Sprintf("testname-proxy%d", i)
  262. dataDirPath, derr := ioutil.TempDir("", name+".etcd")
  263. if derr != nil {
  264. panic("could not get tempdir for datadir")
  265. }
  266. args := []string{
  267. "--name", name,
  268. "--proxy", "on",
  269. "--listen-client-urls", curl.String(),
  270. "--data-dir", dataDirPath,
  271. }
  272. args = append(args, cfg.tlsArgs()...)
  273. etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{
  274. args: args,
  275. dataDirPath: dataDirPath,
  276. keepDataDir: cfg.keepDataDir,
  277. acurl: curl.String(),
  278. isProxy: true,
  279. }
  280. }
  281. initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
  282. for i := range etcdCfgs {
  283. etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
  284. }
  285. return etcdCfgs
  286. }
  287. func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
  288. if cfg.clientTLS != clientNonTLS {
  289. if cfg.isClientAutoTLS {
  290. args = append(args, "--auto-tls=true")
  291. } else {
  292. tlsClientArgs := []string{
  293. "--cert-file", certPath,
  294. "--key-file", privateKeyPath,
  295. "--ca-file", caPath,
  296. }
  297. args = append(args, tlsClientArgs...)
  298. }
  299. }
  300. if cfg.isPeerTLS {
  301. if cfg.isPeerAutoTLS {
  302. args = append(args, "--peer-auto-tls=true")
  303. } else {
  304. tlsPeerArgs := []string{
  305. "--peer-cert-file", certPath,
  306. "--peer-key-file", privateKeyPath,
  307. "--peer-ca-file", caPath,
  308. }
  309. args = append(args, tlsPeerArgs...)
  310. }
  311. }
  312. return args
  313. }
  314. func (epc *etcdProcessCluster) Close() (err error) {
  315. for _, p := range epc.procs {
  316. if p == nil {
  317. continue
  318. }
  319. os.RemoveAll(p.cfg.dataDirPath)
  320. if curErr := p.proc.Stop(); curErr != nil {
  321. if err != nil {
  322. err = fmt.Errorf("%v; %v", err, curErr)
  323. } else {
  324. err = curErr
  325. }
  326. }
  327. <-p.donec
  328. }
  329. return err
  330. }
  331. func spawnCmd(args []string) (*expect.ExpectProcess, error) {
  332. return expect.NewExpect(args[0], args[1:]...)
  333. }
  334. func spawnWithExpect(args []string, expected string) error {
  335. return spawnWithExpects(args, []string{expected}...)
  336. }
  337. func spawnWithExpects(args []string, xs ...string) error {
  338. proc, err := spawnCmd(args)
  339. if err != nil {
  340. return err
  341. }
  342. // process until either stdout or stderr contains
  343. // the expected string
  344. var (
  345. lines []string
  346. lineFunc = func(txt string) bool { return true }
  347. )
  348. for _, txt := range xs {
  349. for {
  350. l, err := proc.ExpectFunc(lineFunc)
  351. if err != nil {
  352. return fmt.Errorf("%v (expected %q, got %q)", err, txt, lines)
  353. }
  354. lines = append(lines, l)
  355. if strings.Contains(l, txt) {
  356. break
  357. }
  358. }
  359. }
  360. perr := proc.Close()
  361. if err != nil {
  362. return err
  363. }
  364. if len(xs) == 0 && proc.LineCount() != 0 { // expect no output
  365. return fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount())
  366. }
  367. return perr
  368. }
  369. // proxies returns only the proxy etcdProcess.
  370. func (epc *etcdProcessCluster) proxies() []*etcdProcess {
  371. return epc.procs[epc.cfg.clusterSize:]
  372. }
  373. func (epc *etcdProcessCluster) backends() []*etcdProcess {
  374. return epc.procs[:epc.cfg.clusterSize]
  375. }
  376. func (epc *etcdProcessCluster) endpoints() []string {
  377. eps := make([]string, epc.cfg.clusterSize)
  378. for i, ep := range epc.backends() {
  379. eps[i] = ep.cfg.acurl
  380. }
  381. return eps
  382. }