discovery_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package discovery
  14. import (
  15. "errors"
  16. "math/rand"
  17. "net/http"
  18. "reflect"
  19. "sort"
  20. "strconv"
  21. "testing"
  22. "time"
  23. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  24. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  25. "github.com/coreos/etcd/client"
  26. )
  27. func TestNewProxyFuncUnset(t *testing.T) {
  28. pf, err := newProxyFunc("")
  29. if pf != nil {
  30. t.Fatal("unexpected non-nil proxyFunc")
  31. }
  32. if err != nil {
  33. t.Fatalf("unexpected non-nil err: %v", err)
  34. }
  35. }
  36. func TestNewProxyFuncBad(t *testing.T) {
  37. tests := []string{
  38. "%%",
  39. "http://foo.com/%1",
  40. }
  41. for i, in := range tests {
  42. pf, err := newProxyFunc(in)
  43. if pf != nil {
  44. t.Errorf("#%d: unexpected non-nil proxyFunc", i)
  45. }
  46. if err == nil {
  47. t.Errorf("#%d: unexpected nil err", i)
  48. }
  49. }
  50. }
  51. func TestNewProxyFunc(t *testing.T) {
  52. tests := map[string]string{
  53. "bar.com": "http://bar.com",
  54. "http://disco.foo.bar": "http://disco.foo.bar",
  55. }
  56. for in, w := range tests {
  57. pf, err := newProxyFunc(in)
  58. if pf == nil {
  59. t.Errorf("%s: unexpected nil proxyFunc", in)
  60. continue
  61. }
  62. if err != nil {
  63. t.Errorf("%s: unexpected non-nil err: %v", in, err)
  64. continue
  65. }
  66. g, err := pf(&http.Request{})
  67. if err != nil {
  68. t.Errorf("%s: unexpected non-nil err: %v", in, err)
  69. }
  70. if g.String() != w {
  71. t.Errorf("%s: proxyURL=%q, want %q", in, g, w)
  72. }
  73. }
  74. }
  75. func TestCheckCluster(t *testing.T) {
  76. cluster := "/prefix/1000"
  77. self := "/1000/1"
  78. tests := []struct {
  79. nodes []*client.Node
  80. werr error
  81. wsize int
  82. }{
  83. {
  84. // self is in the size range
  85. []*client.Node{
  86. {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
  87. {Key: "/1000/_config/"},
  88. {Key: self, CreatedIndex: 2},
  89. {Key: "/1000/2", CreatedIndex: 3},
  90. {Key: "/1000/3", CreatedIndex: 4},
  91. {Key: "/1000/4", CreatedIndex: 5},
  92. },
  93. nil,
  94. 3,
  95. },
  96. {
  97. // self is in the size range
  98. []*client.Node{
  99. {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
  100. {Key: "/1000/_config/"},
  101. {Key: "/1000/2", CreatedIndex: 2},
  102. {Key: "/1000/3", CreatedIndex: 3},
  103. {Key: self, CreatedIndex: 4},
  104. {Key: "/1000/4", CreatedIndex: 5},
  105. },
  106. nil,
  107. 3,
  108. },
  109. {
  110. // self is out of the size range
  111. []*client.Node{
  112. {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
  113. {Key: "/1000/_config/"},
  114. {Key: "/1000/2", CreatedIndex: 2},
  115. {Key: "/1000/3", CreatedIndex: 3},
  116. {Key: "/1000/4", CreatedIndex: 4},
  117. {Key: self, CreatedIndex: 5},
  118. },
  119. ErrFullCluster,
  120. 3,
  121. },
  122. {
  123. // self is not in the cluster
  124. []*client.Node{
  125. {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
  126. {Key: "/1000/_config/"},
  127. {Key: "/1000/2", CreatedIndex: 2},
  128. {Key: "/1000/3", CreatedIndex: 3},
  129. },
  130. nil,
  131. 3,
  132. },
  133. {
  134. []*client.Node{
  135. {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
  136. {Key: "/1000/_config/"},
  137. {Key: "/1000/2", CreatedIndex: 2},
  138. {Key: "/1000/3", CreatedIndex: 3},
  139. {Key: "/1000/4", CreatedIndex: 4},
  140. },
  141. ErrFullCluster,
  142. 3,
  143. },
  144. {
  145. // bad size key
  146. []*client.Node{
  147. {Key: "/1000/_config/size", Value: "bad", CreatedIndex: 1},
  148. },
  149. ErrBadSizeKey,
  150. 0,
  151. },
  152. {
  153. // no size key
  154. []*client.Node{},
  155. ErrSizeNotFound,
  156. 0,
  157. },
  158. }
  159. for i, tt := range tests {
  160. rs := make([]*client.Response, 0)
  161. if len(tt.nodes) > 0 {
  162. rs = append(rs, &client.Response{Node: tt.nodes[0]})
  163. rs = append(rs, &client.Response{
  164. Node: &client.Node{
  165. Key: cluster,
  166. Nodes: tt.nodes[1:],
  167. },
  168. })
  169. }
  170. c := &clientWithResp{rs: rs}
  171. d := discovery{cluster: cluster, id: 1, c: c}
  172. cRetry := &clientWithRetry{failTimes: 3}
  173. cRetry.rs = rs
  174. fc := clockwork.NewFakeClock()
  175. dRetry := discovery{cluster: cluster, id: 1, c: cRetry, clock: fc}
  176. for _, d := range []discovery{d, dRetry} {
  177. go func() {
  178. for i := uint(1); i <= nRetries; i++ {
  179. fc.BlockUntil(1)
  180. fc.Advance(time.Second * (0x1 << i))
  181. }
  182. }()
  183. ns, size, err := d.checkCluster()
  184. if err != tt.werr {
  185. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  186. }
  187. if reflect.DeepEqual(ns, tt.nodes) {
  188. t.Errorf("#%d: nodes = %v, want %v", i, ns, tt.nodes)
  189. }
  190. if size != tt.wsize {
  191. t.Errorf("#%d: size = %v, want %d", i, size, tt.wsize)
  192. }
  193. }
  194. }
  195. }
  196. func TestWaitNodes(t *testing.T) {
  197. all := client.Nodes{
  198. 0: {Key: "/1000/1", CreatedIndex: 2},
  199. 1: {Key: "/1000/2", CreatedIndex: 3},
  200. 2: {Key: "/1000/3", CreatedIndex: 4},
  201. }
  202. tests := []struct {
  203. nodes client.Nodes
  204. rs []*client.Response
  205. }{
  206. {
  207. all,
  208. []*client.Response{},
  209. },
  210. {
  211. all[:1],
  212. []*client.Response{
  213. {Node: &client.Node{Key: "/1000/2", CreatedIndex: 3}},
  214. {Node: &client.Node{Key: "/1000/3", CreatedIndex: 4}},
  215. },
  216. },
  217. {
  218. all[:2],
  219. []*client.Response{
  220. {Node: &client.Node{Key: "/1000/3", CreatedIndex: 4}},
  221. },
  222. },
  223. {
  224. append(all, &client.Node{Key: "/1000/4", CreatedIndex: 5}),
  225. []*client.Response{
  226. {Node: &client.Node{Key: "/1000/3", CreatedIndex: 4}},
  227. },
  228. },
  229. }
  230. for i, tt := range tests {
  231. // Basic case
  232. c := &clientWithResp{nil, &watcherWithResp{tt.rs}}
  233. d := &discovery{cluster: "1000", c: c}
  234. // Retry case
  235. retryScanResp := make([]*client.Response, 0)
  236. if len(tt.nodes) > 0 {
  237. retryScanResp = append(retryScanResp, &client.Response{
  238. Node: &client.Node{
  239. Key: "1000",
  240. Value: strconv.Itoa(3),
  241. },
  242. })
  243. retryScanResp = append(retryScanResp, &client.Response{
  244. Node: &client.Node{
  245. Nodes: tt.nodes,
  246. },
  247. })
  248. }
  249. cRetry := &clientWithResp{
  250. rs: retryScanResp,
  251. w: &watcherWithRetry{rs: tt.rs, failTimes: 2},
  252. }
  253. fc := clockwork.NewFakeClock()
  254. dRetry := &discovery{
  255. cluster: "1000",
  256. c: cRetry,
  257. clock: fc,
  258. }
  259. for _, d := range []*discovery{d, dRetry} {
  260. go func() {
  261. for i := uint(1); i <= nRetries; i++ {
  262. fc.BlockUntil(1)
  263. fc.Advance(time.Second * (0x1 << i))
  264. }
  265. }()
  266. g, err := d.waitNodes(tt.nodes, 3)
  267. if err != nil {
  268. t.Errorf("#%d: err = %v, want %v", i, err, nil)
  269. }
  270. if !reflect.DeepEqual(g, all) {
  271. t.Errorf("#%d: all = %v, want %v", i, g, all)
  272. }
  273. }
  274. }
  275. }
  276. func TestCreateSelf(t *testing.T) {
  277. rs := []*client.Response{{Node: &client.Node{Key: "1000/1", CreatedIndex: 2}}}
  278. w := &watcherWithResp{rs}
  279. errw := &watcherWithErr{errors.New("watch err")}
  280. c := &clientWithResp{rs, w}
  281. errc := &clientWithErr{errors.New("create err"), w}
  282. errwc := &clientWithResp{rs, errw}
  283. tests := []struct {
  284. c client.KeysAPI
  285. werr error
  286. }{
  287. // no error
  288. {c, nil},
  289. // client.create returns an error
  290. {errc, errc.err},
  291. // watcher.next retuens an error
  292. {errwc, errw.err},
  293. }
  294. for i, tt := range tests {
  295. d := discovery{cluster: "1000", c: tt.c}
  296. if err := d.createSelf(""); err != tt.werr {
  297. t.Errorf("#%d: err = %v, want %v", i, err, nil)
  298. }
  299. }
  300. }
  301. func TestNodesToCluster(t *testing.T) {
  302. nodes := client.Nodes{
  303. 0: {Key: "/1000/1", Value: "1=1.1.1.1", CreatedIndex: 1},
  304. 1: {Key: "/1000/2", Value: "2=2.2.2.2", CreatedIndex: 2},
  305. 2: {Key: "/1000/3", Value: "3=3.3.3.3", CreatedIndex: 3},
  306. }
  307. w := "1=1.1.1.1,2=2.2.2.2,3=3.3.3.3"
  308. cluster := nodesToCluster(nodes)
  309. if !reflect.DeepEqual(cluster, w) {
  310. t.Errorf("cluster = %v, want %v", cluster, w)
  311. }
  312. }
  313. func TestSortableNodes(t *testing.T) {
  314. ns := client.Nodes{
  315. 0: {CreatedIndex: 5},
  316. 1: {CreatedIndex: 1},
  317. 2: {CreatedIndex: 3},
  318. 3: {CreatedIndex: 4},
  319. }
  320. // add some randomness
  321. for i := 0; i < 10000; i++ {
  322. ns = append(ns, &client.Node{CreatedIndex: uint64(rand.Int31())})
  323. }
  324. sns := sortableNodes{ns}
  325. sort.Sort(sns)
  326. cis := make([]int, 0)
  327. for _, n := range sns.Nodes {
  328. cis = append(cis, int(n.CreatedIndex))
  329. }
  330. if sort.IntsAreSorted(cis) != true {
  331. t.Errorf("isSorted = %v, want %v", sort.IntsAreSorted(cis), true)
  332. }
  333. cis = make([]int, 0)
  334. for _, n := range ns {
  335. cis = append(cis, int(n.CreatedIndex))
  336. }
  337. if sort.IntsAreSorted(cis) != true {
  338. t.Errorf("isSorted = %v, want %v", sort.IntsAreSorted(cis), true)
  339. }
  340. }
  341. func TestRetryFailure(t *testing.T) {
  342. cluster := "1000"
  343. c := &clientWithRetry{failTimes: 4}
  344. fc := clockwork.NewFakeClock()
  345. d := discovery{
  346. cluster: cluster,
  347. id: 1,
  348. c: c,
  349. clock: fc,
  350. }
  351. go func() {
  352. for i := uint(1); i <= nRetries; i++ {
  353. fc.BlockUntil(1)
  354. fc.Advance(time.Second * (0x1 << i))
  355. }
  356. }()
  357. if _, _, err := d.checkCluster(); err != ErrTooManyRetries {
  358. t.Errorf("err = %v, want %v", err, ErrTooManyRetries)
  359. }
  360. }
  361. type clientWithResp struct {
  362. rs []*client.Response
  363. w client.Watcher
  364. }
  365. func (c *clientWithResp) Create(ctx context.Context, key string, value string, ttl time.Duration) (*client.Response, error) {
  366. if len(c.rs) == 0 {
  367. return &client.Response{}, nil
  368. }
  369. r := c.rs[0]
  370. c.rs = c.rs[1:]
  371. return r, nil
  372. }
  373. func (c *clientWithResp) Get(ctx context.Context, key string) (*client.Response, error) {
  374. if len(c.rs) == 0 {
  375. return &client.Response{}, client.ErrKeyNoExist
  376. }
  377. r := c.rs[0]
  378. c.rs = append(c.rs[1:], r)
  379. return r, nil
  380. }
  381. func (c *clientWithResp) Watch(key string, waitIndex uint64) client.Watcher {
  382. return c.w
  383. }
  384. func (c *clientWithResp) RecursiveWatch(key string, waitIndex uint64) client.Watcher {
  385. return c.w
  386. }
  387. type clientWithErr struct {
  388. err error
  389. w client.Watcher
  390. }
  391. func (c *clientWithErr) Create(ctx context.Context, key string, value string, ttl time.Duration) (*client.Response, error) {
  392. return &client.Response{}, c.err
  393. }
  394. func (c *clientWithErr) Get(ctx context.Context, key string) (*client.Response, error) {
  395. return &client.Response{}, c.err
  396. }
  397. func (c *clientWithErr) Watch(key string, waitIndex uint64) client.Watcher {
  398. return c.w
  399. }
  400. func (c *clientWithErr) RecursiveWatch(key string, waitIndex uint64) client.Watcher {
  401. return c.w
  402. }
  403. type watcherWithResp struct {
  404. rs []*client.Response
  405. }
  406. func (w *watcherWithResp) Next(context.Context) (*client.Response, error) {
  407. if len(w.rs) == 0 {
  408. return &client.Response{}, nil
  409. }
  410. r := w.rs[0]
  411. w.rs = w.rs[1:]
  412. return r, nil
  413. }
  414. type watcherWithErr struct {
  415. err error
  416. }
  417. func (w *watcherWithErr) Next(context.Context) (*client.Response, error) {
  418. return &client.Response{}, w.err
  419. }
  420. // clientWithRetry will timeout all requests up to failTimes
  421. type clientWithRetry struct {
  422. clientWithResp
  423. failCount int
  424. failTimes int
  425. }
  426. func (c *clientWithRetry) Create(ctx context.Context, key string, value string, ttl time.Duration) (*client.Response, error) {
  427. if c.failCount < c.failTimes {
  428. c.failCount++
  429. return nil, client.ErrTimeout
  430. }
  431. return c.clientWithResp.Create(ctx, key, value, ttl)
  432. }
  433. func (c *clientWithRetry) Get(ctx context.Context, key string) (*client.Response, error) {
  434. if c.failCount < c.failTimes {
  435. c.failCount++
  436. return nil, client.ErrTimeout
  437. }
  438. return c.clientWithResp.Get(ctx, key)
  439. }
  440. // watcherWithRetry will timeout all requests up to failTimes
  441. type watcherWithRetry struct {
  442. rs []*client.Response
  443. failCount int
  444. failTimes int
  445. }
  446. func (w *watcherWithRetry) Next(context.Context) (*client.Response, error) {
  447. if w.failCount < w.failTimes {
  448. w.failCount++
  449. return nil, client.ErrTimeout
  450. }
  451. if len(w.rs) == 0 {
  452. return &client.Response{}, nil
  453. }
  454. r := w.rs[0]
  455. w.rs = w.rs[1:]
  456. return r, nil
  457. }