discovery_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package discovery
  14. import (
  15. "errors"
  16. "math"
  17. "math/rand"
  18. "net/http"
  19. "reflect"
  20. "sort"
  21. "strconv"
  22. "testing"
  23. "time"
  24. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  25. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  26. "github.com/coreos/etcd/client"
  27. )
  28. const (
  29. maxRetryInTest = 3
  30. )
  31. func TestNewProxyFuncUnset(t *testing.T) {
  32. pf, err := newProxyFunc("")
  33. if pf != nil {
  34. t.Fatal("unexpected non-nil proxyFunc")
  35. }
  36. if err != nil {
  37. t.Fatalf("unexpected non-nil err: %v", err)
  38. }
  39. }
  40. func TestNewProxyFuncBad(t *testing.T) {
  41. tests := []string{
  42. "%%",
  43. "http://foo.com/%1",
  44. }
  45. for i, in := range tests {
  46. pf, err := newProxyFunc(in)
  47. if pf != nil {
  48. t.Errorf("#%d: unexpected non-nil proxyFunc", i)
  49. }
  50. if err == nil {
  51. t.Errorf("#%d: unexpected nil err", i)
  52. }
  53. }
  54. }
  55. func TestNewProxyFunc(t *testing.T) {
  56. tests := map[string]string{
  57. "bar.com": "http://bar.com",
  58. "http://disco.foo.bar": "http://disco.foo.bar",
  59. }
  60. for in, w := range tests {
  61. pf, err := newProxyFunc(in)
  62. if pf == nil {
  63. t.Errorf("%s: unexpected nil proxyFunc", in)
  64. continue
  65. }
  66. if err != nil {
  67. t.Errorf("%s: unexpected non-nil err: %v", in, err)
  68. continue
  69. }
  70. g, err := pf(&http.Request{})
  71. if err != nil {
  72. t.Errorf("%s: unexpected non-nil err: %v", in, err)
  73. }
  74. if g.String() != w {
  75. t.Errorf("%s: proxyURL=%q, want %q", in, g, w)
  76. }
  77. }
  78. }
  79. func TestCheckCluster(t *testing.T) {
  80. cluster := "/prefix/1000"
  81. self := "/1000/1"
  82. tests := []struct {
  83. nodes []*client.Node
  84. index uint64
  85. werr error
  86. wsize int
  87. }{
  88. {
  89. // self is in the size range
  90. []*client.Node{
  91. {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
  92. {Key: "/1000/_config/"},
  93. {Key: self, CreatedIndex: 2},
  94. {Key: "/1000/2", CreatedIndex: 3},
  95. {Key: "/1000/3", CreatedIndex: 4},
  96. {Key: "/1000/4", CreatedIndex: 5},
  97. },
  98. 5,
  99. nil,
  100. 3,
  101. },
  102. {
  103. // self is in the size range
  104. []*client.Node{
  105. {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
  106. {Key: "/1000/_config/"},
  107. {Key: "/1000/2", CreatedIndex: 2},
  108. {Key: "/1000/3", CreatedIndex: 3},
  109. {Key: self, CreatedIndex: 4},
  110. {Key: "/1000/4", CreatedIndex: 5},
  111. },
  112. 5,
  113. nil,
  114. 3,
  115. },
  116. {
  117. // self is out of the size range
  118. []*client.Node{
  119. {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
  120. {Key: "/1000/_config/"},
  121. {Key: "/1000/2", CreatedIndex: 2},
  122. {Key: "/1000/3", CreatedIndex: 3},
  123. {Key: "/1000/4", CreatedIndex: 4},
  124. {Key: self, CreatedIndex: 5},
  125. },
  126. 5,
  127. ErrFullCluster,
  128. 3,
  129. },
  130. {
  131. // self is not in the cluster
  132. []*client.Node{
  133. {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
  134. {Key: "/1000/_config/"},
  135. {Key: "/1000/2", CreatedIndex: 2},
  136. {Key: "/1000/3", CreatedIndex: 3},
  137. },
  138. 3,
  139. nil,
  140. 3,
  141. },
  142. {
  143. []*client.Node{
  144. {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
  145. {Key: "/1000/_config/"},
  146. {Key: "/1000/2", CreatedIndex: 2},
  147. {Key: "/1000/3", CreatedIndex: 3},
  148. {Key: "/1000/4", CreatedIndex: 4},
  149. },
  150. 3,
  151. ErrFullCluster,
  152. 3,
  153. },
  154. {
  155. // bad size key
  156. []*client.Node{
  157. {Key: "/1000/_config/size", Value: "bad", CreatedIndex: 1},
  158. },
  159. 0,
  160. ErrBadSizeKey,
  161. 0,
  162. },
  163. {
  164. // no size key
  165. []*client.Node{},
  166. 0,
  167. ErrSizeNotFound,
  168. 0,
  169. },
  170. }
  171. for i, tt := range tests {
  172. rs := make([]*client.Response, 0)
  173. if len(tt.nodes) > 0 {
  174. rs = append(rs, &client.Response{Node: tt.nodes[0], Index: tt.index})
  175. rs = append(rs, &client.Response{
  176. Node: &client.Node{
  177. Key: cluster,
  178. Nodes: tt.nodes[1:],
  179. },
  180. Index: tt.index,
  181. })
  182. }
  183. c := &clientWithResp{rs: rs}
  184. d := discovery{cluster: cluster, id: 1, c: c}
  185. cRetry := &clientWithRetry{failTimes: 3}
  186. cRetry.rs = rs
  187. fc := clockwork.NewFakeClock()
  188. dRetry := discovery{cluster: cluster, id: 1, c: cRetry, clock: fc}
  189. for _, d := range []discovery{d, dRetry} {
  190. go func() {
  191. for i := uint(1); i <= maxRetryInTest; i++ {
  192. fc.BlockUntil(1)
  193. fc.Advance(time.Second * (0x1 << i))
  194. }
  195. }()
  196. ns, size, index, err := d.checkCluster()
  197. if err != tt.werr {
  198. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  199. }
  200. if reflect.DeepEqual(ns, tt.nodes) {
  201. t.Errorf("#%d: nodes = %v, want %v", i, ns, tt.nodes)
  202. }
  203. if size != tt.wsize {
  204. t.Errorf("#%d: size = %v, want %d", i, size, tt.wsize)
  205. }
  206. if index != tt.index {
  207. t.Errorf("#%d: index = %v, want %d", i, index, tt.index)
  208. }
  209. }
  210. }
  211. }
  212. func TestWaitNodes(t *testing.T) {
  213. all := client.Nodes{
  214. 0: {Key: "/1000/1", CreatedIndex: 2},
  215. 1: {Key: "/1000/2", CreatedIndex: 3},
  216. 2: {Key: "/1000/3", CreatedIndex: 4},
  217. }
  218. tests := []struct {
  219. nodes client.Nodes
  220. rs []*client.Response
  221. }{
  222. {
  223. all,
  224. []*client.Response{},
  225. },
  226. {
  227. all[:1],
  228. []*client.Response{
  229. {Node: &client.Node{Key: "/1000/2", CreatedIndex: 3}},
  230. {Node: &client.Node{Key: "/1000/3", CreatedIndex: 4}},
  231. },
  232. },
  233. {
  234. all[:2],
  235. []*client.Response{
  236. {Node: &client.Node{Key: "/1000/3", CreatedIndex: 4}},
  237. },
  238. },
  239. {
  240. append(all, &client.Node{Key: "/1000/4", CreatedIndex: 5}),
  241. []*client.Response{
  242. {Node: &client.Node{Key: "/1000/3", CreatedIndex: 4}},
  243. },
  244. },
  245. }
  246. for i, tt := range tests {
  247. // Basic case
  248. c := &clientWithResp{nil, &watcherWithResp{tt.rs}}
  249. d := &discovery{cluster: "1000", c: c}
  250. // Retry case
  251. retryScanResp := make([]*client.Response, 0)
  252. if len(tt.nodes) > 0 {
  253. retryScanResp = append(retryScanResp, &client.Response{
  254. Node: &client.Node{
  255. Key: "1000",
  256. Value: strconv.Itoa(3),
  257. },
  258. })
  259. retryScanResp = append(retryScanResp, &client.Response{
  260. Node: &client.Node{
  261. Nodes: tt.nodes,
  262. },
  263. })
  264. }
  265. cRetry := &clientWithResp{
  266. rs: retryScanResp,
  267. w: &watcherWithRetry{rs: tt.rs, failTimes: 2},
  268. }
  269. fc := clockwork.NewFakeClock()
  270. dRetry := &discovery{
  271. cluster: "1000",
  272. c: cRetry,
  273. clock: fc,
  274. }
  275. for _, d := range []*discovery{d, dRetry} {
  276. go func() {
  277. for i := uint(1); i <= maxRetryInTest; i++ {
  278. fc.BlockUntil(1)
  279. fc.Advance(time.Second * (0x1 << i))
  280. }
  281. }()
  282. g, err := d.waitNodes(tt.nodes, 3, 0) // we do not care about index in this test
  283. if err != nil {
  284. t.Errorf("#%d: err = %v, want %v", i, err, nil)
  285. }
  286. if !reflect.DeepEqual(g, all) {
  287. t.Errorf("#%d: all = %v, want %v", i, g, all)
  288. }
  289. }
  290. }
  291. }
  292. func TestCreateSelf(t *testing.T) {
  293. rs := []*client.Response{{Node: &client.Node{Key: "1000/1", CreatedIndex: 2}}}
  294. w := &watcherWithResp{rs}
  295. errw := &watcherWithErr{errors.New("watch err")}
  296. c := &clientWithResp{rs, w}
  297. errc := &clientWithErr{errors.New("create err"), w}
  298. errwc := &clientWithResp{rs, errw}
  299. tests := []struct {
  300. c client.KeysAPI
  301. werr error
  302. }{
  303. // no error
  304. {c, nil},
  305. // client.create returns an error
  306. {errc, errc.err},
  307. // watcher.next retuens an error
  308. {errwc, errw.err},
  309. }
  310. for i, tt := range tests {
  311. d := discovery{cluster: "1000", c: tt.c}
  312. if err := d.createSelf(""); err != tt.werr {
  313. t.Errorf("#%d: err = %v, want %v", i, err, nil)
  314. }
  315. }
  316. }
  317. func TestNodesToCluster(t *testing.T) {
  318. nodes := client.Nodes{
  319. 0: {Key: "/1000/1", Value: "1=1.1.1.1", CreatedIndex: 1},
  320. 1: {Key: "/1000/2", Value: "2=2.2.2.2", CreatedIndex: 2},
  321. 2: {Key: "/1000/3", Value: "3=3.3.3.3", CreatedIndex: 3},
  322. }
  323. w := "1=1.1.1.1,2=2.2.2.2,3=3.3.3.3"
  324. cluster := nodesToCluster(nodes)
  325. if !reflect.DeepEqual(cluster, w) {
  326. t.Errorf("cluster = %v, want %v", cluster, w)
  327. }
  328. }
  329. func TestSortableNodes(t *testing.T) {
  330. ns := client.Nodes{
  331. 0: {CreatedIndex: 5},
  332. 1: {CreatedIndex: 1},
  333. 2: {CreatedIndex: 3},
  334. 3: {CreatedIndex: 4},
  335. }
  336. // add some randomness
  337. for i := 0; i < 10000; i++ {
  338. ns = append(ns, &client.Node{CreatedIndex: uint64(rand.Int31())})
  339. }
  340. sns := sortableNodes{ns}
  341. sort.Sort(sns)
  342. cis := make([]int, 0)
  343. for _, n := range sns.Nodes {
  344. cis = append(cis, int(n.CreatedIndex))
  345. }
  346. if sort.IntsAreSorted(cis) != true {
  347. t.Errorf("isSorted = %v, want %v", sort.IntsAreSorted(cis), true)
  348. }
  349. cis = make([]int, 0)
  350. for _, n := range ns {
  351. cis = append(cis, int(n.CreatedIndex))
  352. }
  353. if sort.IntsAreSorted(cis) != true {
  354. t.Errorf("isSorted = %v, want %v", sort.IntsAreSorted(cis), true)
  355. }
  356. }
  357. func TestRetryFailure(t *testing.T) {
  358. nRetries = maxRetryInTest
  359. defer func() { nRetries = math.MaxUint32 }()
  360. cluster := "1000"
  361. c := &clientWithRetry{failTimes: 4}
  362. fc := clockwork.NewFakeClock()
  363. d := discovery{
  364. cluster: cluster,
  365. id: 1,
  366. c: c,
  367. clock: fc,
  368. }
  369. go func() {
  370. for i := uint(1); i <= maxRetryInTest; i++ {
  371. fc.BlockUntil(1)
  372. fc.Advance(time.Second * (0x1 << i))
  373. }
  374. }()
  375. if _, _, _, err := d.checkCluster(); err != ErrTooManyRetries {
  376. t.Errorf("err = %v, want %v", err, ErrTooManyRetries)
  377. }
  378. }
  379. type clientWithResp struct {
  380. rs []*client.Response
  381. w client.Watcher
  382. }
  383. func (c *clientWithResp) Create(ctx context.Context, key string, value string, ttl time.Duration) (*client.Response, error) {
  384. if len(c.rs) == 0 {
  385. return &client.Response{}, nil
  386. }
  387. r := c.rs[0]
  388. c.rs = c.rs[1:]
  389. return r, nil
  390. }
  391. func (c *clientWithResp) Get(ctx context.Context, key string) (*client.Response, error) {
  392. if len(c.rs) == 0 {
  393. return &client.Response{}, client.ErrKeyNoExist
  394. }
  395. r := c.rs[0]
  396. c.rs = append(c.rs[1:], r)
  397. return r, nil
  398. }
  399. func (c *clientWithResp) Watch(key string, waitIndex uint64) client.Watcher {
  400. return c.w
  401. }
  402. func (c *clientWithResp) RecursiveWatch(key string, waitIndex uint64) client.Watcher {
  403. return c.w
  404. }
  405. type clientWithErr struct {
  406. err error
  407. w client.Watcher
  408. }
  409. func (c *clientWithErr) Create(ctx context.Context, key string, value string, ttl time.Duration) (*client.Response, error) {
  410. return &client.Response{}, c.err
  411. }
  412. func (c *clientWithErr) Get(ctx context.Context, key string) (*client.Response, error) {
  413. return &client.Response{}, c.err
  414. }
  415. func (c *clientWithErr) Watch(key string, waitIndex uint64) client.Watcher {
  416. return c.w
  417. }
  418. func (c *clientWithErr) RecursiveWatch(key string, waitIndex uint64) client.Watcher {
  419. return c.w
  420. }
  421. type watcherWithResp struct {
  422. rs []*client.Response
  423. }
  424. func (w *watcherWithResp) Next(context.Context) (*client.Response, error) {
  425. if len(w.rs) == 0 {
  426. return &client.Response{}, nil
  427. }
  428. r := w.rs[0]
  429. w.rs = w.rs[1:]
  430. return r, nil
  431. }
  432. type watcherWithErr struct {
  433. err error
  434. }
  435. func (w *watcherWithErr) Next(context.Context) (*client.Response, error) {
  436. return &client.Response{}, w.err
  437. }
  438. // clientWithRetry will timeout all requests up to failTimes
  439. type clientWithRetry struct {
  440. clientWithResp
  441. failCount int
  442. failTimes int
  443. }
  444. func (c *clientWithRetry) Create(ctx context.Context, key string, value string, ttl time.Duration) (*client.Response, error) {
  445. if c.failCount < c.failTimes {
  446. c.failCount++
  447. return nil, client.ErrTimeout
  448. }
  449. return c.clientWithResp.Create(ctx, key, value, ttl)
  450. }
  451. func (c *clientWithRetry) Get(ctx context.Context, key string) (*client.Response, error) {
  452. if c.failCount < c.failTimes {
  453. c.failCount++
  454. return nil, client.ErrTimeout
  455. }
  456. return c.clientWithResp.Get(ctx, key)
  457. }
  458. // watcherWithRetry will timeout all requests up to failTimes
  459. type watcherWithRetry struct {
  460. rs []*client.Response
  461. failCount int
  462. failTimes int
  463. }
  464. func (w *watcherWithRetry) Next(context.Context) (*client.Response, error) {
  465. if w.failCount < w.failTimes {
  466. w.failCount++
  467. return nil, client.ErrTimeout
  468. }
  469. if len(w.rs) == 0 {
  470. return &client.Response{}, nil
  471. }
  472. r := w.rs[0]
  473. w.rs = w.rs[1:]
  474. return r, nil
  475. }